diff --git a/NEW/src/JdeScoping.DataSync/BulkCopyTypeRegistry.cs b/NEW/src/JdeScoping.DataSync/BulkCopyTypeRegistry.cs deleted file mode 100644 index bd5b5b0..0000000 --- a/NEW/src/JdeScoping.DataSync/BulkCopyTypeRegistry.cs +++ /dev/null @@ -1,29 +0,0 @@ -using JdeScoping.Core.Models.Inventory; -using JdeScoping.Core.Models.Organization; -using JdeScoping.Core.Models.Quality; -using JdeScoping.Core.Models.WorkOrders; - -namespace JdeScoping.DataSync; - -/// -/// Registry of types that support bulk copy operations. -/// The source generator analyzes this list to generate IDataReader implementations. -/// -public static class BulkCopyTypeRegistry -{ - /// - /// Types that support bulk copy. Add new types here to generate converters. - /// - public static readonly Type[] Types = - [ - typeof(WorkOrder), - typeof(Lot), - typeof(LotUsage), - typeof(Item), - typeof(WorkCenter), - typeof(ProfitCenter), - typeof(JdeUser), - typeof(Branch), - typeof(MisData), - ]; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/BranchMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/BranchMergeConfiguration.cs deleted file mode 100644 index edd5217..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/BranchMergeConfiguration.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for Branch entities. -/// -public sealed class BranchMergeConfiguration : IMergeConfiguration -{ - public string TableName => "Branch"; - - public Expression> MatchOn => - x => x.Code; - - public Expression>? UpdateColumns => - x => x.Description; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ItemMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ItemMergeConfiguration.cs deleted file mode 100644 index 095224a..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ItemMergeConfiguration.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for Item entities. -/// -public sealed class ItemMergeConfiguration : IMergeConfiguration -{ - public string TableName => "Item"; - - public Expression> MatchOn => - x => x.ShortItemNumber; - - public Expression>? UpdateColumns => - x => new - { - x.ItemNumber, - x.Description, - x.PlanningFamily, - x.StockingType - }; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/JdeUserMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/JdeUserMergeConfiguration.cs deleted file mode 100644 index ffb9473..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/JdeUserMergeConfiguration.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for JdeUser entities. -/// -public sealed class JdeUserMergeConfiguration : IMergeConfiguration -{ - public string TableName => "JdeUser"; - - public Expression> MatchOn => - x => x.AddressNumber; - - public Expression>? UpdateColumns => - x => new { x.UserId, x.FullName }; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotMergeConfiguration.cs deleted file mode 100644 index 6ef5862..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotMergeConfiguration.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for Lot entities. -/// -public sealed class LotMergeConfiguration : IMergeConfiguration -{ - public string TableName => "Lot"; - - public Expression> MatchOn => - x => new { x.LotNumber, x.BranchCode }; - - public Expression>? UpdateColumns => - x => new - { - x.ShortItemNumber, - x.ItemNumber, - x.SupplierCode, - x.StatusCode, - x.Memo1, - x.Memo2, - x.Memo3 - }; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotUsageMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotUsageMergeConfiguration.cs deleted file mode 100644 index 5a77634..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/LotUsageMergeConfiguration.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for LotUsage entities. -/// -public sealed class LotUsageMergeConfiguration : IMergeConfiguration -{ - public string TableName => "LotUsage"; - - public Expression> MatchOn => - x => x.UniqueId; - - public Expression>? UpdateColumns => - x => new - { - x.WorkOrderNumber, - x.LotNumber, - x.BranchCode, - x.ShortItemNumber, - x.Quantity - }; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/MisDataMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/MisDataMergeConfiguration.cs deleted file mode 100644 index f06aa3b..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/MisDataMergeConfiguration.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Quality; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for MisData entities. -/// -public sealed class MisDataMergeConfiguration : IMergeConfiguration -{ - public string TableName => "MisData"; - - public Expression> MatchOn => - x => new { x.ItemNumber, x.BranchCode, x.SequenceNumber, x.MisNumber, x.CharNumber }; - - public Expression>? UpdateColumns => - x => new - { - x.RevId, - x.TestDescription, - x.SamplingType, - x.SamplingValue, - x.ToolsGauges, - x.WorkInstructions, - x.Status, - x.ReleaseDate - }; - - // MisData doesn't have LastUpdateDt, so always update on match - public Expression>? UpdateWhen => null; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ProfitCenterMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ProfitCenterMergeConfiguration.cs deleted file mode 100644 index ebdf742..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/ProfitCenterMergeConfiguration.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for ProfitCenter entities. -/// -public sealed class ProfitCenterMergeConfiguration : IMergeConfiguration -{ - public string TableName => "ProfitCenter"; - - public Expression> MatchOn => - x => x.Code; - - public Expression>? UpdateColumns => - x => x.Description; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkCenterMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkCenterMergeConfiguration.cs deleted file mode 100644 index ffe0713..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkCenterMergeConfiguration.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for WorkCenter entities. -/// -public sealed class WorkCenterMergeConfiguration : IMergeConfiguration -{ - public string TableName => "WorkCenter"; - - public Expression> MatchOn => - x => x.Code; - - public Expression>? UpdateColumns => - x => x.Description; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; -} diff --git a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkOrderMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkOrderMergeConfiguration.cs deleted file mode 100644 index b53b55d..0000000 --- a/NEW/src/JdeScoping.DataSync/Configuration/MergeConfigurations/WorkOrderMergeConfiguration.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.Core.Models.WorkOrders; -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Configuration.MergeConfigurations; - -/// -/// Merge configuration for WorkOrder entities. -/// -public sealed class WorkOrderMergeConfiguration : IMergeConfiguration -{ - public string TableName => "WorkOrder"; - - public Expression> MatchOn => - x => new { x.WorkOrderNumber, x.BranchCode }; - - public Expression>? UpdateColumns => - x => new - { - x.LotNumber, - x.ItemNumber, - x.ShortItemNumber, - x.ParentWorkOrderNumber, - x.OrderQuantity, - x.HeldQuantity, - x.ShippedQuantity, - x.StatusCode, - x.StatusCodeUpdateDt, - x.IssueDate, - x.StartDate, - x.RoutingType - }; - - public Expression>? UpdateWhen => - (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt; - - public Expression>? InsertColumns => null; // All columns -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IBulkMergeHelper.cs b/NEW/src/JdeScoping.DataSync/Contracts/IBulkMergeHelper.cs deleted file mode 100644 index 31b3e5e..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IBulkMergeHelper.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System.Linq.Expressions; -using JdeScoping.DataSync.Models; - -namespace JdeScoping.DataSync.Contracts; - -/// -/// Helper for performing bulk merge operations from async enumerable sources to SQL Server tables. -/// -public interface IBulkMergeHelper -{ - /// - /// Merges data from an async enumerable source into a destination table. - /// - /// The entity type. - /// The source data to merge. - /// The destination SQL table name. - /// Expression defining the columns to match on (primary key). - /// Expression defining which columns to update on match. Null means all non-PK columns. - /// Optional condition for when to perform updates. If null, always updates on match. - /// Expression defining which columns to insert. Null means all columns. - /// Optional temp table name. Defaults to #TEMP_{destinationTable}. - /// Number of rows per batch. 0 means process all at once. - /// If true, validates data against schema before bulk copy. - /// Cancellation token. - /// Result containing row counts and timing information. - Task MergeAsync( - IAsyncEnumerable data, - string destinationTable, - Expression> matchOn, - Expression>? updateColumns = null, - Expression>? updateWhen = null, - Expression>? insertColumns = null, - string? tempTableName = null, - int batchSize = 0, - bool validateBeforeCopy = false, - CancellationToken cancellationToken = default) where T : class; - - /// - /// Performs a mass insert (full table refresh) with optional index management. - /// Truncates table, disables non-clustered indexes, bulk copies data, rebuilds indexes. - /// - /// The entity type. - /// The source data to insert. - /// The destination SQL table name. - /// If true, rebuilds indexes after insert. If false, just re-enables them. - /// Number of rows per bulk copy batch. 0 = default (10000). - /// Cancellation token. - /// Result containing row count and timing. - Task MassInsertAsync( - IAsyncEnumerable data, - string destinationTable, - bool rebuildIndexes = true, - int batchSize = 0, - CancellationToken cancellationToken = default) where T : class; -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IDataFetcher.cs b/NEW/src/JdeScoping.DataSync/Contracts/IDataFetcher.cs deleted file mode 100644 index cc8ffeb..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IDataFetcher.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace JdeScoping.DataSync.Contracts; - -/// -/// Interface for fetching data from source systems (JDE/CMS). -/// -/// The entity type being fetched. -public interface IDataFetcher where TEntity : class -{ - /// - /// Fetches entities from source system as an async stream. - /// - /// For incremental fetches, only return records modified after this time. Null for full fetch. - /// Cancellation token for graceful shutdown. - /// Async enumerable of entities, streamed from source. - IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - CancellationToken cancellationToken = default); - - /// - /// Gets the entity type name for logging purposes. - /// - string EntityTypeName => typeof(TEntity).Name; -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IDataReaderFactory.cs b/NEW/src/JdeScoping.DataSync/Contracts/IDataReaderFactory.cs deleted file mode 100644 index 6d909d4..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IDataReaderFactory.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System.Data; - -namespace JdeScoping.DataSync.Contracts; - -/// -/// Factory for creating IDataReader instances from IAsyncEnumerable sources. -/// Implementations are typically source-generated. -/// -public interface IDataReaderFactory -{ - /// - /// Creates an IDataReader that wraps the async enumerable source for use with SqlBulkCopy. - /// - /// The entity type. - /// The async enumerable data source. - /// An IDataReader that can be passed to SqlBulkCopy. - /// Thrown when no converter exists for type T. - IDataReader CreateReader(IAsyncEnumerable source) where T : class; - - /// - /// Gets the column names for a given entity type in ordinal order. - /// - /// The entity type. - /// List of column names matching the IDataReader column ordinals. - IReadOnlyList GetColumnNames() where T : class; -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfiguration.cs b/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfiguration.cs deleted file mode 100644 index 1027be3..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfiguration.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System.Linq.Expressions; - -namespace JdeScoping.DataSync.Contracts; - -/// -/// Defines merge configuration for an entity type. -/// -/// The entity type. -public interface IMergeConfiguration where T : class -{ - /// - /// Gets the destination table name in SQL Server. - /// - string TableName { get; } - - /// - /// Gets the expression defining columns to match on (primary key). - /// - Expression> MatchOn { get; } - - /// - /// Gets the expression defining columns to update when matched. - /// Null means all non-PK columns. - /// - Expression>? UpdateColumns { get; } - - /// - /// Gets the condition for when to perform updates. - /// Null means always update on match. - /// - Expression>? UpdateWhen { get; } - - /// - /// Gets the expression defining columns to insert. - /// Null means all columns. - /// - Expression>? InsertColumns { get; } -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfigurationRegistry.cs b/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfigurationRegistry.cs deleted file mode 100644 index 0d895be..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IMergeConfigurationRegistry.cs +++ /dev/null @@ -1,22 +0,0 @@ -namespace JdeScoping.DataSync.Contracts; - -/// -/// Registry for looking up merge configurations by entity type. -/// -public interface IMergeConfigurationRegistry -{ - /// - /// Gets the merge configuration for the specified entity type. - /// - /// The entity type. - /// The merge configuration. - /// Thrown if no configuration is registered. - IMergeConfiguration GetConfiguration() where T : class; - - /// - /// Checks if a merge configuration exists for the specified entity type. - /// - /// The entity type. - /// True if configuration exists. - bool HasConfiguration() where T : class; -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IPostProcessor.cs b/NEW/src/JdeScoping.DataSync/Contracts/IPostProcessor.cs deleted file mode 100644 index 84f8a29..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/IPostProcessor.cs +++ /dev/null @@ -1,15 +0,0 @@ -namespace JdeScoping.DataSync.Contracts; - -/// -/// Interface for post-processing operations after data sync. -/// -public interface IPostProcessor -{ - /// - /// Executes post-processing logic after data has been synced to the table. - /// - /// The table that was synced. - /// Cancellation token. - /// A task representing the asynchronous operation. - Task ProcessAsync(string tableName, CancellationToken cancellationToken = default); -} diff --git a/NEW/src/JdeScoping.DataSync/Contracts/ISchemaValidator.cs b/NEW/src/JdeScoping.DataSync/Contracts/ISchemaValidator.cs deleted file mode 100644 index 8ad3331..0000000 --- a/NEW/src/JdeScoping.DataSync/Contracts/ISchemaValidator.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System.Data; -using JdeScoping.DataSync.Exceptions; -using JdeScoping.DataSync.Models; - -namespace JdeScoping.DataSync.Contracts; - -/// -/// Validates data against database table schema. -/// -public interface ISchemaValidator -{ - /// - /// Gets the schema for a table. - /// - /// Database connection. - /// Table name. - /// Cancellation token. - /// List of column schemas. - Task> GetTableSchemaAsync( - IDbConnection connection, - string tableName, - CancellationToken cancellationToken = default); - - /// - /// Validates a batch of data against the table schema. - /// - /// Entity type. - /// Data to validate. - /// Column schemas to validate against. - /// Maximum number of errors to collect before stopping (0 = unlimited). - /// List of validation errors. - IReadOnlyList ValidateBatch( - IReadOnlyList data, - IReadOnlyList schema, - int maxErrors = 100) where T : class; -} diff --git a/NEW/src/JdeScoping.DataSync/Exceptions/BulkMergeException.cs b/NEW/src/JdeScoping.DataSync/Exceptions/BulkMergeException.cs deleted file mode 100644 index 912d094..0000000 --- a/NEW/src/JdeScoping.DataSync/Exceptions/BulkMergeException.cs +++ /dev/null @@ -1,80 +0,0 @@ -namespace JdeScoping.DataSync.Exceptions; - -/// -/// Exception thrown when a bulk merge operation fails. -/// -public class BulkMergeException : Exception -{ - /// - /// The destination table name. - /// - public string TableName { get; init; } = string.Empty; - - /// - /// The batch number that failed (1-based). - /// - public int BatchNumber { get; init; } - - /// - /// Number of rows in the failed batch. - /// - public int RowsInBatch { get; init; } - - /// - /// The SQL statement that was being executed when the error occurred. - /// - public string? SqlStatement { get; init; } - - public BulkMergeException() - { - } - - public BulkMergeException(string message) : base(message) - { - } - - public BulkMergeException(string message, Exception innerException) : base(message, innerException) - { - } -} - -/// -/// Exception thrown when validation fails before bulk copy. -/// -public class BulkMergeValidationException : BulkMergeException -{ - /// - /// The validation errors that were found. - /// - public IReadOnlyList Errors { get; init; } = []; - - public BulkMergeValidationException() - { - } - - public BulkMergeValidationException(string message) : base(message) - { - } - - public BulkMergeValidationException(string message, IReadOnlyList errors) : base(message) - { - Errors = errors; - } - - public BulkMergeValidationException(string message, Exception innerException) : base(message, innerException) - { - } -} - -/// -/// Represents a validation error for a specific row and column. -/// -/// Zero-based row index in the batch. -/// The column name that failed validation. -/// The value that failed validation. -/// Human-readable error message. -public record ValidationError( - int RowIndex, - string ColumnName, - object? Value, - string Message); diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Cms/CmsMisDataFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Cms/CmsMisDataFetcher.cs deleted file mode 100644 index 144bd7b..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Cms/CmsMisDataFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Quality; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Cms; - -/// -/// Fetches MIS data from CMS (Sybase/Oracle). -/// -public class CmsMisDataFetcher : IDataFetcher -{ - private readonly ICmsDataSource _cmsDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public CmsMisDataFetcher( - ICmsDataSource cmsDataSource, - ILogger logger) - { - _cmsDataSource = cmsDataSource ?? throw new ArgumentNullException(nameof(cmsDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching MIS data from CMS, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var misData in _cmsDataSource.GetMisDataAsync(minimumDt, cancellationToken)) - { - yield return misData; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeBranchFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeBranchFetcher.cs deleted file mode 100644 index f3a8f0d..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeBranchFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches branch data from JDE. -/// -public class JdeBranchFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeBranchFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching branches from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var branch in _jdeDataSource.GetBranchesAsync(minimumDt, cancellationToken)) - { - yield return branch; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeItemFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeItemFetcher.cs deleted file mode 100644 index 0ffa742..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeItemFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches item master data from JDE. -/// -public class JdeItemFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeItemFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching items from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var item in _jdeDataSource.GetItemsAsync(minimumDt, cancellationToken)) - { - yield return item; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotFetcher.cs deleted file mode 100644 index fcbb304..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches lot master data from JDE. -/// -public class JdeLotFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeLotFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching lots from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var lot in _jdeDataSource.GetLotsAsync(minimumDt, cancellationToken)) - { - yield return lot; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotUsageFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotUsageFetcher.cs deleted file mode 100644 index fdfe435..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeLotUsageFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Inventory; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches lot usage (cardex) data from JDE. -/// -public class JdeLotUsageFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeLotUsageFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching lot usages from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var lotUsage in _jdeDataSource.GetLotUsagesAsync(minimumDt, cancellationToken)) - { - yield return lotUsage; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeProfitCenterFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeProfitCenterFetcher.cs deleted file mode 100644 index 12452f9..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeProfitCenterFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches profit center data from JDE. -/// -public class JdeProfitCenterFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeProfitCenterFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching profit centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var profitCenter in _jdeDataSource.GetProfitCentersAsync(minimumDt, cancellationToken)) - { - yield return profitCenter; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeUserFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeUserFetcher.cs deleted file mode 100644 index bfd7d8a..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeUserFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches JDE user data from JDE. -/// -public class JdeUserFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeUserFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching JDE users, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var user in _jdeDataSource.GetUsersAsync(minimumDt, cancellationToken)) - { - yield return user; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkCenterFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkCenterFetcher.cs deleted file mode 100644 index 82e3a31..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkCenterFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.Organization; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches work center data from JDE. -/// -public class JdeWorkCenterFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeWorkCenterFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching work centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var workCenter in _jdeDataSource.GetWorkCentersAsync(minimumDt, cancellationToken)) - { - yield return workCenter; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkOrderFetcher.cs b/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkOrderFetcher.cs deleted file mode 100644 index df221ad..0000000 --- a/NEW/src/JdeScoping.DataSync/Fetchers/Jde/JdeWorkOrderFetcher.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Runtime.CompilerServices; -using JdeScoping.Core.Interfaces; -using JdeScoping.Core.Models; -using JdeScoping.Core.Models.WorkOrders; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Fetchers.Jde; - -/// -/// Fetches work order data from JDE. -/// -public class JdeWorkOrderFetcher : IDataFetcher -{ - private readonly IJdeDataSource _jdeDataSource; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public JdeWorkOrderFetcher( - IJdeDataSource jdeDataSource, - ILogger logger) - { - _jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async IAsyncEnumerable FetchAsync( - DateTime? minimumDt, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - _logger.LogDebug("Fetching work orders from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null"); - - await foreach (var workOrder in _jdeDataSource.GetWorkOrdersAsync(minimumDt, cancellationToken)) - { - yield return workOrder; - } - } -} diff --git a/NEW/src/JdeScoping.DataSync/Models/ColumnSchema.cs b/NEW/src/JdeScoping.DataSync/Models/ColumnSchema.cs deleted file mode 100644 index f080b7e..0000000 --- a/NEW/src/JdeScoping.DataSync/Models/ColumnSchema.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace JdeScoping.DataSync.Models; - -/// -/// Represents the schema of a database column. -/// -/// Column name. -/// SQL Server data type name (e.g., "nvarchar", "int", "decimal"). -/// Maximum length for string columns (-1 for MAX). -/// Precision for decimal columns. -/// Scale for decimal columns. -/// Whether the column allows null values. -/// Column ordinal position (1-based). -public record ColumnSchema( - string Name, - string DataType, - int? MaxLength, - int? Precision, - int? Scale, - bool IsNullable, - int OrdinalPosition); diff --git a/NEW/src/JdeScoping.DataSync/Models/MergeResult.cs b/NEW/src/JdeScoping.DataSync/Models/MergeResult.cs deleted file mode 100644 index 736fb64..0000000 --- a/NEW/src/JdeScoping.DataSync/Models/MergeResult.cs +++ /dev/null @@ -1,33 +0,0 @@ -namespace JdeScoping.DataSync.Models; - -/// -/// Result of a bulk merge operation. -/// -/// Total number of rows processed from source. -/// Number of rows inserted (new records). -/// Number of rows updated (existing records). -/// Number of batches processed. -/// Total elapsed time for the operation. -public record MergeResult( - int TotalRowsProcessed, - int RowsInserted, - int RowsUpdated, - int BatchCount, - TimeSpan Elapsed) -{ - /// - /// Total rows affected (inserted + updated). - /// - public int TotalRowsAffected => RowsInserted + RowsUpdated; -} - -/// -/// Result of a mass insert operation. -/// -/// Total rows inserted. -/// Total elapsed time. -/// Whether indexes were rebuilt (vs just re-enabled). -public record MassInsertResult( - long TotalRowsInserted, - TimeSpan Elapsed, - bool IndexesRebuilt); diff --git a/NEW/src/JdeScoping.DataSync/Services/BulkMergeHelper.cs b/NEW/src/JdeScoping.DataSync/Services/BulkMergeHelper.cs deleted file mode 100644 index 8556b25..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/BulkMergeHelper.cs +++ /dev/null @@ -1,432 +0,0 @@ -using System.Diagnostics; -using System.Linq.Expressions; -using Dapper; -using JdeScoping.DataAccess.Interfaces; -using JdeScoping.DataSync.Contracts; -using JdeScoping.DataSync.Exceptions; -using JdeScoping.DataSync.Models; -using Microsoft.Data.SqlClient; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Services; - -/// -/// Implements bulk merge operations using temp tables and SQL MERGE statements. -/// -internal sealed class BulkMergeHelper : IBulkMergeHelper -{ - private const int DefaultBatchSize = 10000; - - private readonly IDbConnectionFactory _connectionFactory; - private readonly IDataReaderFactory _dataReaderFactory; - private readonly ISchemaValidator _schemaValidator; - private readonly ILogger _logger; - - public BulkMergeHelper( - IDbConnectionFactory connectionFactory, - IDataReaderFactory dataReaderFactory, - ISchemaValidator schemaValidator, - ILogger logger) - { - _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); - _dataReaderFactory = dataReaderFactory ?? throw new ArgumentNullException(nameof(dataReaderFactory)); - _schemaValidator = schemaValidator ?? throw new ArgumentNullException(nameof(schemaValidator)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async Task MergeAsync( - IAsyncEnumerable data, - string destinationTable, - Expression> matchOn, - Expression>? updateColumns = null, - Expression>? updateWhen = null, - Expression>? insertColumns = null, - string? tempTableName = null, - int batchSize = 0, - bool validateBeforeCopy = false, - CancellationToken cancellationToken = default) where T : class - { - ArgumentNullException.ThrowIfNull(data); - ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable); - ArgumentNullException.ThrowIfNull(matchOn); - - var stopwatch = Stopwatch.StartNew(); - var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize; - var effectiveTempTable = tempTableName ?? $"#TEMP_{destinationTable.Replace(".", "_").Replace("[", "").Replace("]", "")}"; - - // Parse expressions to get column names - var matchColumnNames = ExpressionParser.GetColumnNames(matchOn); - var allColumns = _dataReaderFactory.GetColumnNames(); - - // Default update columns = all columns except match columns - var updateColumnNames = updateColumns != null - ? ExpressionParser.GetColumnNames(updateColumns) - : allColumns.Where(c => !matchColumnNames.Contains(c, StringComparer.OrdinalIgnoreCase)).ToList(); - - // Default insert columns = all columns - var insertColumnNames = insertColumns != null - ? ExpressionParser.GetColumnNames(insertColumns) - : allColumns; - - // Build the updateWhen SQL clause if provided - var updateWhenSql = ExpressionParser.BuildUpdateWhenSql(updateWhen); - - _logger.LogDebug( - "Starting bulk merge to {DestinationTable}. BatchSize={BatchSize}, MatchColumns={MatchColumns}, TempTable={TempTable}", - destinationTable, effectiveBatchSize, string.Join(",", matchColumnNames), effectiveTempTable); - - await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); - - // Get schema for validation if needed - IReadOnlyList? schema = null; - if (validateBeforeCopy) - { - schema = await _schemaValidator.GetTableSchemaAsync(connection, destinationTable, cancellationToken); - } - - int totalRows = 0; - int totalInserted = 0; - int totalUpdated = 0; - int batchCount = 0; - bool tempTableCreated = false; - - try - { - // Create temp table with same schema as destination - await CreateTempTableAsync(connection, effectiveTempTable, destinationTable, cancellationToken); - tempTableCreated = true; - - // Build MERGE SQL once - var mergeSql = MergeSqlBuilder.BuildMergeSimple( - destinationTable, - effectiveTempTable, - matchColumnNames, - updateColumnNames, - updateWhenSql, - insertColumnNames); - - // Process data in batches - var batch = new List(effectiveBatchSize); - - await foreach (var item in data.WithCancellation(cancellationToken)) - { - batch.Add(item); - - if (batch.Count >= effectiveBatchSize) - { - batchCount++; - var (inserted, updated) = await ProcessBatchAsync( - connection, batch, effectiveTempTable, destinationTable, - mergeSql, schema, batchCount, cancellationToken); - - totalRows += batch.Count; - totalInserted += inserted; - totalUpdated += updated; - batch.Clear(); - } - } - - // Process remaining items - if (batch.Count > 0) - { - batchCount++; - var (inserted, updated) = await ProcessBatchAsync( - connection, batch, effectiveTempTable, destinationTable, - mergeSql, schema, batchCount, cancellationToken); - - totalRows += batch.Count; - totalInserted += inserted; - totalUpdated += updated; - } - - stopwatch.Stop(); - - _logger.LogInformation( - "Bulk merge to {DestinationTable} completed. TotalRows={TotalRows}, Inserted={Inserted}, Updated={Updated}, Batches={Batches}, Elapsed={Elapsed}ms", - destinationTable, totalRows, totalInserted, totalUpdated, batchCount, stopwatch.ElapsedMilliseconds); - - return new MergeResult(totalRows, totalInserted, totalUpdated, batchCount, stopwatch.Elapsed); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - _logger.LogError(ex, "Bulk merge to {DestinationTable} failed after {Batches} batches and {TotalRows} rows", - destinationTable, batchCount, totalRows); - throw; - } - finally - { - // Always try to drop the temp table - if (tempTableCreated) - { - try - { - await DropTempTableAsync(connection, effectiveTempTable, CancellationToken.None); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Failed to drop temp table {TempTable}", effectiveTempTable); - } - } - } - } - - private async Task<(int Inserted, int Updated)> ProcessBatchAsync( - SqlConnection connection, - IReadOnlyList batch, - string tempTableName, - string destinationTable, - string mergeSql, - IReadOnlyList? schema, - int batchNumber, - CancellationToken cancellationToken) where T : class - { - try - { - // Validate if schema available - if (schema != null) - { - var errors = _schemaValidator.ValidateBatch(batch, schema); - if (errors.Count > 0) - { - throw new BulkMergeValidationException( - $"Validation failed for batch {batchNumber} with {errors.Count} errors.", - errors) - { - TableName = destinationTable, - BatchNumber = batchNumber, - RowsInBatch = batch.Count - }; - } - } - - // Bulk copy to temp table - await BulkCopyAsync(connection, batch, tempTableName, cancellationToken); - - // Execute MERGE - var rowsAffected = await connection.ExecuteAsync( - new CommandDefinition(mergeSql, cancellationToken: cancellationToken)); - - // Truncate temp table for next batch - await connection.ExecuteAsync( - new CommandDefinition( - MergeSqlBuilder.BuildTruncateTempTable(tempTableName), - cancellationToken: cancellationToken)); - - _logger.LogDebug( - "Batch {BatchNumber} completed. Rows={RowCount}, Affected={RowsAffected}", - batchNumber, batch.Count, rowsAffected); - - // Note: The simple MERGE doesn't track insert vs update counts - // We return affected rows as inserts for now - could enhance MERGE to track this - return (rowsAffected, 0); - } - catch (SqlException ex) - { - throw new BulkMergeException( - $"SQL error during batch {batchNumber}: {ex.Message}", ex) - { - TableName = destinationTable, - BatchNumber = batchNumber, - RowsInBatch = batch.Count, - SqlStatement = mergeSql - }; - } - } - - private async Task BulkCopyAsync( - SqlConnection connection, - IReadOnlyList batch, - string tempTableName, - CancellationToken cancellationToken) where T : class - { - using var bulkCopy = new SqlBulkCopy(connection) - { - DestinationTableName = tempTableName, - BatchSize = batch.Count - }; - - // Create reader from the batch list - using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable()); - await bulkCopy.WriteToServerAsync(reader, cancellationToken); - } - - private static async Task CreateTempTableAsync( - SqlConnection connection, - string tempTableName, - string sourceTableName, - CancellationToken cancellationToken) - { - var sql = MergeSqlBuilder.BuildCreateTempTable(tempTableName, sourceTableName); - await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken)); - } - - private static async Task DropTempTableAsync( - SqlConnection connection, - string tempTableName, - CancellationToken cancellationToken) - { - var sql = MergeSqlBuilder.BuildDropTempTable(tempTableName); - await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken)); - } - - /// - public async Task MassInsertAsync( - IAsyncEnumerable data, - string destinationTable, - bool rebuildIndexes = true, - int batchSize = 0, - CancellationToken cancellationToken = default) where T : class - { - ArgumentNullException.ThrowIfNull(data); - ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable); - - var stopwatch = Stopwatch.StartNew(); - var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize; - - _logger.LogInformation( - "Starting mass insert to {DestinationTable}. BatchSize={BatchSize}, RebuildIndexes={RebuildIndexes}", - destinationTable, effectiveBatchSize, rebuildIndexes); - - await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); - - long totalRows = 0; - - try - { - // Step 1: Disable non-clustered indexes - await DisableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken); - - // Step 2: Truncate the table - await connection.ExecuteAsync( - new CommandDefinition($"TRUNCATE TABLE [{destinationTable}]", cancellationToken: cancellationToken)); - - _logger.LogDebug("Truncated table {DestinationTable}", destinationTable); - - // Step 3: Bulk copy data in batches - var batch = new List(effectiveBatchSize); - - await foreach (var item in data.WithCancellation(cancellationToken)) - { - batch.Add(item); - - if (batch.Count >= effectiveBatchSize) - { - await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken); - totalRows += batch.Count; - _logger.LogDebug("Inserted batch of {Count} rows to {Table}", batch.Count, destinationTable); - batch.Clear(); - } - } - - // Insert remaining rows - if (batch.Count > 0) - { - await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken); - totalRows += batch.Count; - _logger.LogDebug("Inserted final batch of {Count} rows to {Table}", batch.Count, destinationTable); - } - - // Step 4: Rebuild or re-enable indexes - if (rebuildIndexes) - { - await RebuildIndexesAsync(connection, destinationTable, cancellationToken); - } - else - { - await EnableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken); - } - - stopwatch.Stop(); - - _logger.LogInformation( - "Mass insert to {DestinationTable} completed. TotalRows={TotalRows}, Elapsed={Elapsed}ms", - destinationTable, totalRows, stopwatch.ElapsedMilliseconds); - - return new MassInsertResult(totalRows, stopwatch.Elapsed, rebuildIndexes); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - _logger.LogError(ex, "Mass insert to {DestinationTable} failed after {TotalRows} rows", - destinationTable, totalRows); - - // Try to re-enable indexes even on failure - try - { - await EnableNonClusteredIndexesAsync(connection, destinationTable, CancellationToken.None); - } - catch (Exception indexEx) - { - _logger.LogWarning(indexEx, "Failed to re-enable indexes on {Table} after error", destinationTable); - } - - throw; - } - } - - private async Task BulkCopyDirectAsync( - SqlConnection connection, - IReadOnlyList batch, - string destinationTable, - CancellationToken cancellationToken) where T : class - { - using var bulkCopy = new SqlBulkCopy(connection) - { - DestinationTableName = $"[{destinationTable}]", - BatchSize = batch.Count, - BulkCopyTimeout = 3600 - }; - - using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable()); - await bulkCopy.WriteToServerAsync(reader, cancellationToken); - } - - private static async Task DisableNonClusteredIndexesAsync( - SqlConnection connection, - string tableName, - CancellationToken cancellationToken) - { - var sql = $@" -DECLARE @sql NVARCHAR(MAX) = ''; -SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] DISABLE;' + CHAR(13) -FROM sys.indexes i -INNER JOIN sys.tables t ON i.object_id = t.object_id -WHERE t.name = @tableName - AND i.type = 2 - AND i.is_disabled = 0; -EXEC sp_executesql @sql;"; - - await connection.ExecuteAsync( - new CommandDefinition(sql, new { tableName }, commandTimeout: 300, cancellationToken: cancellationToken)); - } - - private static async Task EnableNonClusteredIndexesAsync( - SqlConnection connection, - string tableName, - CancellationToken cancellationToken) - { - var sql = $@" -DECLARE @sql NVARCHAR(MAX) = ''; -SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] REBUILD;' + CHAR(13) -FROM sys.indexes i -INNER JOIN sys.tables t ON i.object_id = t.object_id -WHERE t.name = @tableName - AND i.type = 2 - AND i.is_disabled = 1; -EXEC sp_executesql @sql;"; - - await connection.ExecuteAsync( - new CommandDefinition(sql, new { tableName }, commandTimeout: 3600, cancellationToken: cancellationToken)); - } - - private static async Task RebuildIndexesAsync( - SqlConnection connection, - string tableName, - CancellationToken cancellationToken) - { - var sql = $"ALTER INDEX ALL ON [{tableName}] REBUILD WITH (FILLFACTOR = 95)"; - - await connection.ExecuteAsync( - new CommandDefinition(sql, commandTimeout: 3600, cancellationToken: cancellationToken)); - } -} diff --git a/NEW/src/JdeScoping.DataSync/Services/ExpressionParser.cs b/NEW/src/JdeScoping.DataSync/Services/ExpressionParser.cs deleted file mode 100644 index 6525321..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/ExpressionParser.cs +++ /dev/null @@ -1,192 +0,0 @@ -using System.Linq.Expressions; -using System.Reflection; - -namespace JdeScoping.DataSync.Services; - -/// -/// Parses LINQ expressions to extract column names and build SQL clauses. -/// -internal static class ExpressionParser -{ - /// - /// Extracts column names from an expression like: x => new { x.A, x.B } or x => x.Id - /// - public static IReadOnlyList GetColumnNames(Expression> expression) - { - ArgumentNullException.ThrowIfNull(expression); - - var body = expression.Body; - - // Handle conversion (boxing for value types) - if (body is UnaryExpression unary && unary.NodeType == ExpressionType.Convert) - { - body = unary.Operand; - } - - return body switch - { - // Single property: x => x.Id - MemberExpression member => [GetPropertyName(member)], - - // Anonymous type: x => new { x.A, x.B } - NewExpression newExpr => ExtractFromNewExpression(newExpr), - - // Member init (shouldn't typically happen but handle it) - MemberInitExpression memberInit => ExtractFromNewExpression(memberInit.NewExpression), - - _ => throw new ArgumentException( - $"Unsupported expression type: {body.NodeType}. " + - "Use either a single property (x => x.Id) or an anonymous type (x => new {{ x.A, x.B }}).", - nameof(expression)) - }; - } - - /// - /// Builds a SQL condition clause from an updateWhen expression. - /// Example: (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt - /// Returns: "source.[LastUpdateDt] > target.[LastUpdateDt]" - /// - public static string? BuildUpdateWhenSql( - Expression>? expression, - string sourceAlias = "source", - string targetAlias = "target") - { - if (expression == null) - return null; - - var sourceParam = expression.Parameters[0].Name!; - var targetParam = expression.Parameters[1].Name!; - - return TranslateExpression(expression.Body, sourceParam, sourceAlias, targetParam, targetAlias); - } - - private static IReadOnlyList ExtractFromNewExpression(NewExpression newExpr) - { - if (newExpr.Arguments.Count == 0) - { - throw new ArgumentException("Anonymous type expression must contain at least one property."); - } - - var columns = new List(); - foreach (var arg in newExpr.Arguments) - { - if (arg is MemberExpression member) - { - columns.Add(GetPropertyName(member)); - } - else - { - throw new ArgumentException( - $"Each property in the anonymous type must be a direct property access. " + - $"Found: {arg.NodeType}"); - } - } - - return columns; - } - - private static string GetPropertyName(MemberExpression member) - { - // Ensure it's a property access on the parameter - if (member.Expression is not ParameterExpression) - { - throw new ArgumentException( - $"Only direct property access is supported. " + - $"Nested properties like x.Parent.Name are not allowed. Found: {member}"); - } - - if (member.Member is not PropertyInfo prop) - { - throw new ArgumentException( - $"Expression must access a property, not a field. Found: {member.Member.MemberType}"); - } - - return prop.Name; - } - - private static string TranslateExpression( - Expression expr, - string sourceParam, string sourceAlias, - string targetParam, string targetAlias) - { - return expr switch - { - BinaryExpression binary => TranslateBinaryExpression( - binary, sourceParam, sourceAlias, targetParam, targetAlias), - - MemberExpression member => TranslateMemberExpression( - member, sourceParam, sourceAlias, targetParam, targetAlias), - - ConstantExpression constant => TranslateConstant(constant), - - UnaryExpression unary when unary.NodeType == ExpressionType.Not => - $"NOT ({TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias)})", - - UnaryExpression unary when unary.NodeType == ExpressionType.Convert => - TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias), - - _ => throw new NotSupportedException($"Expression type {expr.NodeType} is not supported in updateWhen clause.") - }; - } - - private static string TranslateBinaryExpression( - BinaryExpression binary, - string sourceParam, string sourceAlias, - string targetParam, string targetAlias) - { - var left = TranslateExpression(binary.Left, sourceParam, sourceAlias, targetParam, targetAlias); - var right = TranslateExpression(binary.Right, sourceParam, sourceAlias, targetParam, targetAlias); - - var op = binary.NodeType switch - { - ExpressionType.Equal => "=", - ExpressionType.NotEqual => "<>", - ExpressionType.GreaterThan => ">", - ExpressionType.GreaterThanOrEqual => ">=", - ExpressionType.LessThan => "<", - ExpressionType.LessThanOrEqual => "<=", - ExpressionType.AndAlso => "AND", - ExpressionType.OrElse => "OR", - _ => throw new NotSupportedException($"Binary operator {binary.NodeType} is not supported.") - }; - - // Wrap AND/OR with parentheses for clarity - if (binary.NodeType is ExpressionType.AndAlso or ExpressionType.OrElse) - { - return $"({left} {op} {right})"; - } - - return $"{left} {op} {right}"; - } - - private static string TranslateMemberExpression( - MemberExpression member, - string sourceParam, string sourceAlias, - string targetParam, string targetAlias) - { - if (member.Expression is not ParameterExpression param) - { - throw new NotSupportedException( - "Only direct property access on source or target parameters is supported."); - } - - var alias = param.Name == sourceParam ? sourceAlias : targetAlias; - var propName = member.Member.Name; - - return $"{alias}.[{propName}]"; - } - - private static string TranslateConstant(ConstantExpression constant) - { - if (constant.Value == null) - return "NULL"; - - return constant.Value switch - { - string s => $"'{s.Replace("'", "''")}'", - bool b => b ? "1" : "0", - DateTime dt => $"'{dt:yyyy-MM-dd HH:mm:ss.fff}'", - _ => constant.Value.ToString() ?? "NULL" - }; - } -} diff --git a/NEW/src/JdeScoping.DataSync/Services/MergeConfigurationRegistry.cs b/NEW/src/JdeScoping.DataSync/Services/MergeConfigurationRegistry.cs deleted file mode 100644 index a696309..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/MergeConfigurationRegistry.cs +++ /dev/null @@ -1,29 +0,0 @@ -using JdeScoping.DataSync.Contracts; - -namespace JdeScoping.DataSync.Services; - -/// -/// Registry implementation that resolves configurations from DI. -/// -internal sealed class MergeConfigurationRegistry : IMergeConfigurationRegistry -{ - private readonly IServiceProvider _serviceProvider; - - public MergeConfigurationRegistry(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); - } - - public IMergeConfiguration GetConfiguration() where T : class - { - var config = _serviceProvider.GetService(typeof(IMergeConfiguration)) as IMergeConfiguration; - return config ?? throw new InvalidOperationException( - $"No merge configuration registered for {typeof(T).Name}. " + - $"Register IMergeConfiguration<{typeof(T).Name}> in ServiceCollectionExtensions."); - } - - public bool HasConfiguration() where T : class - { - return _serviceProvider.GetService(typeof(IMergeConfiguration)) != null; - } -} diff --git a/NEW/src/JdeScoping.DataSync/Services/MergeSqlBuilder.cs b/NEW/src/JdeScoping.DataSync/Services/MergeSqlBuilder.cs deleted file mode 100644 index 66bd044..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/MergeSqlBuilder.cs +++ /dev/null @@ -1,199 +0,0 @@ -using System.Text; - -namespace JdeScoping.DataSync.Services; - -/// -/// Builds SQL statements for bulk merge operations. -/// -internal static class MergeSqlBuilder -{ - /// - /// Builds a SQL statement to create a temp table with the same schema as the source table. - /// - public static string BuildCreateTempTable(string tempTableName, string sourceTableName) - { - ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName); - ArgumentException.ThrowIfNullOrWhiteSpace(sourceTableName); - - return $"SELECT TOP 0 * INTO [{tempTableName}] FROM [{sourceTableName}]"; - } - - /// - /// Builds a SQL MERGE statement. - /// - /// Target table name. - /// Source temp table name. - /// Columns to match on (primary key). - /// Columns to update on match. - /// Optional SQL condition for when to update. Example: "source.[LastUpdateDt] > target.[LastUpdateDt]" - /// Columns to insert for new records. - /// The MERGE SQL statement with output counts. - public static string BuildMerge( - string destinationTable, - string tempTableName, - IReadOnlyList matchColumns, - IReadOnlyList updateColumns, - string? updateWhenClause, - IReadOnlyList insertColumns) - { - ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable); - ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName); - ArgumentNullException.ThrowIfNull(matchColumns); - ArgumentNullException.ThrowIfNull(insertColumns); - - if (matchColumns.Count == 0) - throw new ArgumentException("At least one match column is required.", nameof(matchColumns)); - - if (insertColumns.Count == 0) - throw new ArgumentException("At least one insert column is required.", nameof(insertColumns)); - - var sb = new StringBuilder(); - - // Declare output variable to track insert vs update - sb.AppendLine("DECLARE @InsertCount INT = 0, @UpdateCount INT = 0;"); - sb.AppendLine(); - - // MERGE statement - sb.AppendLine($"MERGE INTO [{destinationTable}] AS target"); - sb.AppendLine($"USING [{tempTableName}] AS source"); - - // ON clause - sb.Append("ON "); - for (int i = 0; i < matchColumns.Count; i++) - { - if (i > 0) - sb.Append(" AND "); - sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]"); - } - sb.AppendLine(); - - // WHEN MATCHED (update) - if (updateColumns != null && updateColumns.Count > 0) - { - sb.Append("WHEN MATCHED"); - if (!string.IsNullOrWhiteSpace(updateWhenClause)) - { - sb.Append($" AND {updateWhenClause}"); - } - sb.AppendLine(" THEN"); - sb.Append(" UPDATE SET "); - - for (int i = 0; i < updateColumns.Count; i++) - { - if (i > 0) - sb.Append(", "); - sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]"); - } - sb.AppendLine(); - } - - // WHEN NOT MATCHED (insert) - sb.AppendLine("WHEN NOT MATCHED THEN"); - sb.Append(" INSERT ("); - sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]"))); - sb.AppendLine(")"); - sb.Append(" VALUES ("); - sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]"))); - sb.AppendLine(")"); - - // OUTPUT clause to count inserts and updates - sb.AppendLine("OUTPUT $action INTO @ActionOutput;"); - sb.AppendLine(); - - // Calculate counts - sb.AppendLine("SELECT @InsertCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'INSERT';"); - sb.AppendLine("SELECT @UpdateCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'UPDATE';"); - sb.AppendLine("SELECT @InsertCount AS InsertCount, @UpdateCount AS UpdateCount;"); - - return sb.ToString(); - } - - /// - /// Builds a simpler MERGE statement that returns just the affected row count. - /// - public static string BuildMergeSimple( - string destinationTable, - string tempTableName, - IReadOnlyList matchColumns, - IReadOnlyList updateColumns, - string? updateWhenClause, - IReadOnlyList insertColumns) - { - ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable); - ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName); - ArgumentNullException.ThrowIfNull(matchColumns); - ArgumentNullException.ThrowIfNull(insertColumns); - - if (matchColumns.Count == 0) - throw new ArgumentException("At least one match column is required.", nameof(matchColumns)); - - if (insertColumns.Count == 0) - throw new ArgumentException("At least one insert column is required.", nameof(insertColumns)); - - var sb = new StringBuilder(); - - // MERGE statement - sb.AppendLine($"MERGE INTO [{destinationTable}] AS target"); - sb.AppendLine($"USING [{tempTableName}] AS source"); - - // ON clause - sb.Append("ON "); - for (int i = 0; i < matchColumns.Count; i++) - { - if (i > 0) - sb.Append(" AND "); - sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]"); - } - sb.AppendLine(); - - // WHEN MATCHED (update) - if (updateColumns != null && updateColumns.Count > 0) - { - sb.Append("WHEN MATCHED"); - if (!string.IsNullOrWhiteSpace(updateWhenClause)) - { - sb.Append($" AND {updateWhenClause}"); - } - sb.AppendLine(" THEN"); - sb.Append(" UPDATE SET "); - - for (int i = 0; i < updateColumns.Count; i++) - { - if (i > 0) - sb.Append(", "); - sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]"); - } - sb.AppendLine(); - } - - // WHEN NOT MATCHED (insert) - sb.AppendLine("WHEN NOT MATCHED THEN"); - sb.Append(" INSERT ("); - sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]"))); - sb.AppendLine(")"); - sb.Append(" VALUES ("); - sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]"))); - sb.Append(")"); - sb.AppendLine(";"); - - return sb.ToString(); - } - - /// - /// Builds a SQL statement to truncate a temp table. - /// - public static string BuildTruncateTempTable(string tempTableName) - { - ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName); - return $"TRUNCATE TABLE [{tempTableName}]"; - } - - /// - /// Builds a SQL statement to drop a temp table. - /// - public static string BuildDropTempTable(string tempTableName) - { - ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName); - return $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE [{tempTableName}]"; - } -} diff --git a/NEW/src/JdeScoping.DataSync/Services/MisDataPostProcessor.cs b/NEW/src/JdeScoping.DataSync/Services/MisDataPostProcessor.cs deleted file mode 100644 index 3271e9d..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/MisDataPostProcessor.cs +++ /dev/null @@ -1,35 +0,0 @@ -using JdeScoping.Core.Interfaces; -using JdeScoping.DataSync.Contracts; -using Microsoft.Extensions.Logging; - -namespace JdeScoping.DataSync.Services; - -/// -/// Post-processor for MIS data to set obsolete dates. -/// -public class MisDataPostProcessor : IPostProcessor -{ - private readonly ILotFinderRepository _repository; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public MisDataPostProcessor( - ILotFinderRepository repository, - ILogger logger) - { - _repository = repository ?? throw new ArgumentNullException(nameof(repository)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - public async Task ProcessAsync(string tableName, CancellationToken cancellationToken = default) - { - _logger.LogDebug("Running MIS data post-processing for {Table}", tableName); - - await _repository.PostProcessMisDataAsync(cancellationToken); - - _logger.LogDebug("MIS data post-processing completed for {Table}", tableName); - } -} diff --git a/NEW/src/JdeScoping.DataSync/Services/SchemaValidator.cs b/NEW/src/JdeScoping.DataSync/Services/SchemaValidator.cs deleted file mode 100644 index e5cb235..0000000 --- a/NEW/src/JdeScoping.DataSync/Services/SchemaValidator.cs +++ /dev/null @@ -1,223 +0,0 @@ -using System.Data; -using System.Reflection; -using Dapper; -using JdeScoping.DataSync.Contracts; -using JdeScoping.DataSync.Exceptions; -using JdeScoping.DataSync.Models; - -namespace JdeScoping.DataSync.Services; - -/// -/// Validates data against database table schema. -/// -internal sealed class SchemaValidator : ISchemaValidator -{ - /// - public async Task> GetTableSchemaAsync( - IDbConnection connection, - string tableName, - CancellationToken cancellationToken = default) - { - ArgumentNullException.ThrowIfNull(connection); - ArgumentException.ThrowIfNullOrWhiteSpace(tableName); - - // Parse schema and table name - var (schemaName, table) = ParseTableName(tableName); - - const string sql = """ - SELECT - COLUMN_NAME AS Name, - DATA_TYPE AS DataType, - CHARACTER_MAXIMUM_LENGTH AS MaxLength, - NUMERIC_PRECISION AS [Precision], - NUMERIC_SCALE AS Scale, - CASE WHEN IS_NULLABLE = 'YES' THEN 1 ELSE 0 END AS IsNullable, - ORDINAL_POSITION AS OrdinalPosition - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = @Schema AND TABLE_NAME = @Table - ORDER BY ORDINAL_POSITION - """; - - var columns = await connection.QueryAsync( - new CommandDefinition( - sql, - new { Schema = schemaName, Table = table }, - cancellationToken: cancellationToken)); - - return columns.ToList(); - } - - /// - public IReadOnlyList ValidateBatch( - IReadOnlyList data, - IReadOnlyList schema, - int maxErrors = 100) where T : class - { - ArgumentNullException.ThrowIfNull(data); - ArgumentNullException.ThrowIfNull(schema); - - if (data.Count == 0 || schema.Count == 0) - return []; - - var errors = new List(); - var schemaLookup = schema.ToDictionary(s => s.Name, StringComparer.OrdinalIgnoreCase); - var properties = typeof(T).GetProperties(BindingFlags.Public | BindingFlags.Instance) - .ToDictionary(p => p.Name, StringComparer.OrdinalIgnoreCase); - - for (int rowIndex = 0; rowIndex < data.Count; rowIndex++) - { - var row = data[rowIndex]; - if (row == null) continue; - - foreach (var column in schema) - { - if (!properties.TryGetValue(column.Name, out var property)) - continue; - - var value = property.GetValue(row); - var error = ValidateValue(rowIndex, column, value); - - if (error != null) - { - errors.Add(error); - - if (maxErrors > 0 && errors.Count >= maxErrors) - return errors; - } - } - } - - return errors; - } - - private static ValidationError? ValidateValue(int rowIndex, ColumnSchema column, object? value) - { - // Check nullability - if (value == null || (value is string s && string.IsNullOrEmpty(s))) - { - if (!column.IsNullable && !IsIdentityOrComputed(column)) - { - return new ValidationError( - rowIndex, - column.Name, - value, - $"Column '{column.Name}' does not allow null values."); - } - return null; - } - - // Check string length - if (column.MaxLength.HasValue && column.MaxLength.Value > 0) - { - var stringValue = value as string ?? value.ToString(); - if (stringValue != null && stringValue.Length > column.MaxLength.Value) - { - return new ValidationError( - rowIndex, - column.Name, - value, - $"Value for '{column.Name}' exceeds maximum length of {column.MaxLength}. Actual length: {stringValue.Length}."); - } - } - - // Check decimal precision/scale - if (column.Precision.HasValue && column.Scale.HasValue && IsDecimalType(column.DataType)) - { - if (value is decimal decimalValue) - { - var error = ValidateDecimal(rowIndex, column, decimalValue); - if (error != null) - return error; - } - else if (value is double || value is float) - { - // Convert to decimal for validation - try - { - var decVal = Convert.ToDecimal(value); - var error = ValidateDecimal(rowIndex, column, decVal); - if (error != null) - return error; - } - catch - { - return new ValidationError( - rowIndex, - column.Name, - value, - $"Value for '{column.Name}' cannot be converted to decimal."); - } - } - } - - return null; - } - - private static ValidationError? ValidateDecimal(int rowIndex, ColumnSchema column, decimal value) - { - // Calculate actual precision and scale - var sqlString = value.ToString(System.Globalization.CultureInfo.InvariantCulture); - var parts = sqlString.TrimStart('-').Split('.'); - var integerDigits = parts[0].TrimStart('0').Length; - var decimalDigits = parts.Length > 1 ? parts[1].TrimEnd('0').Length : 0; - - // Handle zero case - if (integerDigits == 0 && decimalDigits == 0) - integerDigits = 1; - - var totalDigits = integerDigits + decimalDigits; - var maxTotalDigits = column.Precision!.Value; - var maxDecimalDigits = column.Scale!.Value; - var maxIntegerDigits = maxTotalDigits - maxDecimalDigits; - - if (integerDigits > maxIntegerDigits) - { - return new ValidationError( - rowIndex, - column.Name, - value, - $"Value {value} for '{column.Name}' exceeds maximum integer digits. " + - $"Maximum: {maxIntegerDigits}, Actual: {integerDigits}."); - } - - // Note: We don't validate decimal digits as SQL Server will truncate/round them - return null; - } - - private static bool IsDecimalType(string dataType) - { - return dataType.Equals("decimal", StringComparison.OrdinalIgnoreCase) || - dataType.Equals("numeric", StringComparison.OrdinalIgnoreCase) || - dataType.Equals("money", StringComparison.OrdinalIgnoreCase) || - dataType.Equals("smallmoney", StringComparison.OrdinalIgnoreCase); - } - - private static bool IsIdentityOrComputed(ColumnSchema column) - { - // Identity columns and computed columns can be null in the source data - // This is a simple heuristic - actual identity check would require additional metadata - return column.Name.Equals("Id", StringComparison.OrdinalIgnoreCase) || - column.Name.EndsWith("Id", StringComparison.OrdinalIgnoreCase); - } - - private static (string Schema, string Table) ParseTableName(string tableName) - { - // Remove brackets if present - var cleaned = tableName.Replace("[", "").Replace("]", ""); - - // Check for temp table - if (cleaned.StartsWith('#')) - { - return ("dbo", cleaned); - } - - // Split by dot - var parts = cleaned.Split('.'); - return parts.Length switch - { - 1 => ("dbo", parts[0]), - 2 => (parts[0], parts[1]), - _ => throw new ArgumentException($"Invalid table name format: {tableName}") - }; - } -}