Initial commit: JDE Scoping Tool migration project

Set up repository with legacy .NET Framework 4.8 source (OLD/),
new .NET 10 Blazor solution (NEW/), OpenSpec specifications,
documentation, and project configuration.
This commit is contained in:
Joseph Doherty
2026-01-02 07:43:29 -05:00
commit 26ff8d9b4f
1761 changed files with 596509 additions and 0 deletions
@@ -0,0 +1,29 @@
using JdeScoping.Core.Models.Inventory;
using JdeScoping.Core.Models.Organization;
using JdeScoping.Core.Models.Quality;
using JdeScoping.Core.Models.WorkOrders;
namespace JdeScoping.DataSync;
/// <summary>
/// Registry of types that support bulk copy operations.
/// The source generator analyzes this list to generate IDataReader implementations.
/// </summary>
public static class BulkCopyTypeRegistry
{
/// <summary>
/// Types that support bulk copy. Add new types here to generate converters.
/// </summary>
public static readonly Type[] Types =
[
typeof(WorkOrder),
typeof(Lot),
typeof(LotUsage),
typeof(Item),
typeof(WorkCenter),
typeof(ProfitCenter),
typeof(JdeUser),
typeof(Branch),
typeof(MisData),
];
}
@@ -0,0 +1,78 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a single data source table sync.
/// </summary>
public class DataSourceConfig
{
/// <summary>
/// Target table name in SQL Server cache.
/// </summary>
public required string TableName { get; set; }
/// <summary>
/// Source system: "JDE" or "CMS".
/// </summary>
public required string SourceSystem { get; set; }
/// <summary>
/// Source data identifier (e.g., "WORKORDER", "LOTUSAGE").
/// </summary>
public string SourceData { get; set; } = string.Empty;
/// <summary>
/// Name of IDataFetcher implementation type (without generic suffix).
/// </summary>
public required string FetcherTypeName { get; set; }
/// <summary>
/// Optional IPostProcessor implementation type name.
/// </summary>
public string? PostProcessorTypeName { get; set; }
/// <summary>
/// Whether this data source is enabled for sync.
/// </summary>
public bool IsEnabled { get; set; } = true;
/// <summary>
/// Mass sync schedule configuration.
/// </summary>
public ScheduleConfig MassConfig { get; set; } = new();
/// <summary>
/// Daily incremental sync configuration.
/// </summary>
public ScheduleConfig DailyConfig { get; set; } = new();
/// <summary>
/// Hourly incremental sync configuration.
/// </summary>
public ScheduleConfig HourlyConfig { get; set; } = new();
}
/// <summary>
/// Schedule configuration for a sync type (Mass/Daily/Hourly).
/// </summary>
public class ScheduleConfig
{
/// <summary>
/// Whether this schedule is enabled.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval in minutes between syncs.
/// </summary>
public int IntervalMinutes { get; set; }
/// <summary>
/// Whether to truncate the table before syncing (mass updates only).
/// </summary>
public bool PrepurgeData { get; set; } = false;
/// <summary>
/// Whether to rebuild indexes after syncing (mass updates only).
/// </summary>
public bool ReIndexData { get; set; } = false;
}
@@ -0,0 +1,65 @@
using System.ComponentModel.DataAnnotations;
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration options for the data synchronization service.
/// </summary>
public class DataSyncOptions
{
/// <summary>
/// Configuration section name in appsettings.json.
/// </summary>
public const string SectionName = "DataSync";
/// <summary>
/// Time between schedule checks (default: 1 minute).
/// </summary>
public TimeSpan CheckInterval { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>
/// Maximum parallel sync operations (default: 8).
/// </summary>
[Range(1, 32)]
public int MaxDegreeOfParallelism { get; set; } = 8;
/// <summary>
/// Records per batch for streaming (default: 1,000,000).
/// </summary>
[Range(1000, 10_000_000)]
public int BatchSize { get; set; } = 1_000_000;
/// <summary>
/// Rows per bulk copy batch (default: 10,000).
/// </summary>
[Range(100, 100_000)]
public int BulkCopyBatchSize { get; set; } = 10_000;
/// <summary>
/// Multiplier for lookback window (default: 3).
/// </summary>
[Range(1, 10)]
public int LookbackMultiplier { get; set; } = 3;
/// <summary>
/// Days to retain DataUpdate history (default: 30).
/// </summary>
[Range(1, 365)]
public int PurgeRetentionDays { get; set; } = 30;
/// <summary>
/// Timeout in seconds for sync operations (default: 3600 = 1 hour).
/// </summary>
[Range(60, 86400)]
public int SyncTimeoutSeconds { get; set; } = 3600;
/// <summary>
/// Whether the data sync service is enabled (default: true).
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Per-table data source configurations.
/// </summary>
public List<DataSourceConfig> DataSources { get; set; } = [];
}
@@ -0,0 +1,24 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Branch entities.
/// </summary>
public sealed class BranchMergeConfiguration : IMergeConfiguration<Branch>
{
public string TableName => "Branch";
public Expression<Func<Branch, object>> MatchOn =>
x => x.Code;
public Expression<Func<Branch, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<Branch, Branch, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Branch, object>>? InsertColumns => null;
}
@@ -0,0 +1,30 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Item entities.
/// </summary>
public sealed class ItemMergeConfiguration : IMergeConfiguration<Item>
{
public string TableName => "Item";
public Expression<Func<Item, object>> MatchOn =>
x => x.ShortItemNumber;
public Expression<Func<Item, object>>? UpdateColumns =>
x => new
{
x.ItemNumber,
x.Description,
x.PlanningFamily,
x.StockingType
};
public Expression<Func<Item, Item, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Item, object>>? InsertColumns => null;
}
@@ -0,0 +1,24 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for JdeUser entities.
/// </summary>
public sealed class JdeUserMergeConfiguration : IMergeConfiguration<JdeUser>
{
public string TableName => "JdeUser";
public Expression<Func<JdeUser, object>> MatchOn =>
x => x.AddressNumber;
public Expression<Func<JdeUser, object>>? UpdateColumns =>
x => new { x.UserId, x.FullName };
public Expression<Func<JdeUser, JdeUser, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<JdeUser, object>>? InsertColumns => null;
}
@@ -0,0 +1,33 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Lot entities.
/// </summary>
public sealed class LotMergeConfiguration : IMergeConfiguration<Lot>
{
public string TableName => "Lot";
public Expression<Func<Lot, object>> MatchOn =>
x => new { x.LotNumber, x.BranchCode };
public Expression<Func<Lot, object>>? UpdateColumns =>
x => new
{
x.ShortItemNumber,
x.ItemNumber,
x.SupplierCode,
x.StatusCode,
x.Memo1,
x.Memo2,
x.Memo3
};
public Expression<Func<Lot, Lot, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Lot, object>>? InsertColumns => null;
}
@@ -0,0 +1,31 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for LotUsage entities.
/// </summary>
public sealed class LotUsageMergeConfiguration : IMergeConfiguration<LotUsage>
{
public string TableName => "LotUsage";
public Expression<Func<LotUsage, object>> MatchOn =>
x => x.UniqueId;
public Expression<Func<LotUsage, object>>? UpdateColumns =>
x => new
{
x.WorkOrderNumber,
x.LotNumber,
x.BranchCode,
x.ShortItemNumber,
x.Quantity
};
public Expression<Func<LotUsage, LotUsage, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<LotUsage, object>>? InsertColumns => null;
}
@@ -0,0 +1,34 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Quality;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for MisData entities.
/// </summary>
public sealed class MisDataMergeConfiguration : IMergeConfiguration<MisData>
{
public string TableName => "MisData";
public Expression<Func<MisData, object>> MatchOn =>
x => new { x.ItemNumber, x.BranchCode, x.SequenceNumber, x.MisNumber, x.CharNumber };
public Expression<Func<MisData, object>>? UpdateColumns =>
x => new
{
x.RevId,
x.TestDescription,
x.SamplingType,
x.SamplingValue,
x.ToolsGauges,
x.WorkInstructions,
x.Status,
x.ReleaseDate
};
// MisData doesn't have LastUpdateDt, so always update on match
public Expression<Func<MisData, MisData, bool>>? UpdateWhen => null;
public Expression<Func<MisData, object>>? InsertColumns => null;
}
@@ -0,0 +1,24 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for ProfitCenter entities.
/// </summary>
public sealed class ProfitCenterMergeConfiguration : IMergeConfiguration<ProfitCenter>
{
public string TableName => "ProfitCenter";
public Expression<Func<ProfitCenter, object>> MatchOn =>
x => x.Code;
public Expression<Func<ProfitCenter, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<ProfitCenter, ProfitCenter, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<ProfitCenter, object>>? InsertColumns => null;
}
@@ -0,0 +1,24 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for WorkCenter entities.
/// </summary>
public sealed class WorkCenterMergeConfiguration : IMergeConfiguration<WorkCenter>
{
public string TableName => "WorkCenter";
public Expression<Func<WorkCenter, object>> MatchOn =>
x => x.Code;
public Expression<Func<WorkCenter, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<WorkCenter, WorkCenter, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<WorkCenter, object>>? InsertColumns => null;
}
@@ -0,0 +1,38 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.WorkOrders;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for WorkOrder entities.
/// </summary>
public sealed class WorkOrderMergeConfiguration : IMergeConfiguration<WorkOrder>
{
public string TableName => "WorkOrder";
public Expression<Func<WorkOrder, object>> MatchOn =>
x => new { x.WorkOrderNumber, x.BranchCode };
public Expression<Func<WorkOrder, object>>? UpdateColumns =>
x => new
{
x.LotNumber,
x.ItemNumber,
x.ShortItemNumber,
x.ParentWorkOrderNumber,
x.OrderQuantity,
x.HeldQuantity,
x.ShippedQuantity,
x.StatusCode,
x.StatusCodeUpdateDt,
x.IssueDate,
x.StartDate,
x.RoutingType
};
public Expression<Func<WorkOrder, WorkOrder, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<WorkOrder, object>>? InsertColumns => null; // All columns
}
@@ -0,0 +1,55 @@
using System.Linq.Expressions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Helper for performing bulk merge operations from async enumerable sources to SQL Server tables.
/// </summary>
public interface IBulkMergeHelper
{
/// <summary>
/// Merges data from an async enumerable source into a destination table.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="data">The source data to merge.</param>
/// <param name="destinationTable">The destination SQL table name.</param>
/// <param name="matchOn">Expression defining the columns to match on (primary key).</param>
/// <param name="updateColumns">Expression defining which columns to update on match. Null means all non-PK columns.</param>
/// <param name="updateWhen">Optional condition for when to perform updates. If null, always updates on match.</param>
/// <param name="insertColumns">Expression defining which columns to insert. Null means all columns.</param>
/// <param name="tempTableName">Optional temp table name. Defaults to #TEMP_{destinationTable}.</param>
/// <param name="batchSize">Number of rows per batch. 0 means process all at once.</param>
/// <param name="validateBeforeCopy">If true, validates data against schema before bulk copy.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Result containing row counts and timing information.</returns>
Task<MergeResult> MergeAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
Expression<Func<T, object>> matchOn,
Expression<Func<T, object>>? updateColumns = null,
Expression<Func<T, T, bool>>? updateWhen = null,
Expression<Func<T, object>>? insertColumns = null,
string? tempTableName = null,
int batchSize = 0,
bool validateBeforeCopy = false,
CancellationToken cancellationToken = default) where T : class;
/// <summary>
/// Performs a mass insert (full table refresh) with optional index management.
/// Truncates table, disables non-clustered indexes, bulk copies data, rebuilds indexes.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="data">The source data to insert.</param>
/// <param name="destinationTable">The destination SQL table name.</param>
/// <param name="rebuildIndexes">If true, rebuilds indexes after insert. If false, just re-enables them.</param>
/// <param name="batchSize">Number of rows per bulk copy batch. 0 = default (10000).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Result containing row count and timing.</returns>
Task<MassInsertResult> MassInsertAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
bool rebuildIndexes = true,
int batchSize = 0,
CancellationToken cancellationToken = default) where T : class;
}
@@ -0,0 +1,23 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Interface for fetching data from source systems (JDE/CMS).
/// </summary>
/// <typeparam name="TEntity">The entity type being fetched.</typeparam>
public interface IDataFetcher<TEntity> where TEntity : class
{
/// <summary>
/// Fetches entities from source system as an async stream.
/// </summary>
/// <param name="minimumDt">For incremental fetches, only return records modified after this time. Null for full fetch.</param>
/// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
/// <returns>Async enumerable of entities, streamed from source.</returns>
IAsyncEnumerable<TEntity> FetchAsync(
DateTime? minimumDt,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the entity type name for logging purposes.
/// </summary>
string EntityTypeName => typeof(TEntity).Name;
}
@@ -0,0 +1,26 @@
using System.Data;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Factory for creating IDataReader instances from IAsyncEnumerable sources.
/// Implementations are typically source-generated.
/// </summary>
public interface IDataReaderFactory
{
/// <summary>
/// Creates an IDataReader that wraps the async enumerable source for use with SqlBulkCopy.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="source">The async enumerable data source.</param>
/// <returns>An IDataReader that can be passed to SqlBulkCopy.</returns>
/// <exception cref="NotSupportedException">Thrown when no converter exists for type T.</exception>
IDataReader CreateReader<T>(IAsyncEnumerable<T> source) where T : class;
/// <summary>
/// Gets the column names for a given entity type in ordinal order.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>List of column names matching the IDataReader column ordinals.</returns>
IReadOnlyList<string> GetColumnNames<T>() where T : class;
}
@@ -0,0 +1,82 @@
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Infrastructure;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Repository for DataUpdate operations.
/// </summary>
public interface IDataUpdateRepository
{
/// <summary>
/// Gets the last data update record for each table/type combination.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Dictionary keyed by "TableName_UpdateType".</returns>
Task<Dictionary<string, DataUpdate>> GetLastDataUpdatesAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Starts a data update record with in-progress marker (NumberRecords = -2).
/// </summary>
/// <param name="sourceSystem">Source system name.</param>
/// <param name="sourceData">Source data identifier.</param>
/// <param name="tableName">Target table name.</param>
/// <param name="updateType">Type of update.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The ID of the created record.</returns>
Task<int> StartUpdateAsync(
string sourceSystem,
string sourceData,
string tableName,
UpdateTypes updateType,
CancellationToken cancellationToken = default);
/// <summary>
/// Completes a data update record with final status.
/// </summary>
/// <param name="updateId">The ID of the update record.</param>
/// <param name="recordCount">Number of records processed (-1 for failure).</param>
/// <param name="success">Whether the update was successful.</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task CompleteUpdateAsync(
int updateId,
long recordCount,
bool success,
CancellationToken cancellationToken = default);
/// <summary>
/// Closes any open update entries (NumberRecords = -2) as failed.
/// Called at service startup to clean up interrupted operations.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Number of records updated.</returns>
Task<int> CloseOpenUpdateEntriesAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Purges DataUpdate records older than the specified number of days.
/// </summary>
/// <param name="retentionDays">Number of days to retain.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Number of records deleted.</returns>
Task<int> PurgeOldEntriesAsync(int retentionDays, CancellationToken cancellationToken = default);
/// <summary>
/// Gets sync status for health check purposes.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>List of table sync status records.</returns>
Task<List<TableSyncStatus>> GetSyncStatusAsync(CancellationToken cancellationToken = default);
}
/// <summary>
/// Status of a table sync for health check reporting.
/// </summary>
public record TableSyncStatus(
string TableName,
UpdateTypes UpdateType,
DateTime? LastSyncTime,
bool WasSuccessful,
int ExpectedIntervalMinutes,
bool IsOverdue,
int RecentFailures);
@@ -0,0 +1,38 @@
using System.Linq.Expressions;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Defines merge configuration for an entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
public interface IMergeConfiguration<T> where T : class
{
/// <summary>
/// Gets the destination table name in SQL Server.
/// </summary>
string TableName { get; }
/// <summary>
/// Gets the expression defining columns to match on (primary key).
/// </summary>
Expression<Func<T, object>> MatchOn { get; }
/// <summary>
/// Gets the expression defining columns to update when matched.
/// Null means all non-PK columns.
/// </summary>
Expression<Func<T, object>>? UpdateColumns { get; }
/// <summary>
/// Gets the condition for when to perform updates.
/// Null means always update on match.
/// </summary>
Expression<Func<T, T, bool>>? UpdateWhen { get; }
/// <summary>
/// Gets the expression defining columns to insert.
/// Null means all columns.
/// </summary>
Expression<Func<T, object>>? InsertColumns { get; }
}
@@ -0,0 +1,22 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Registry for looking up merge configurations by entity type.
/// </summary>
public interface IMergeConfigurationRegistry
{
/// <summary>
/// Gets the merge configuration for the specified entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>The merge configuration.</returns>
/// <exception cref="InvalidOperationException">Thrown if no configuration is registered.</exception>
IMergeConfiguration<T> GetConfiguration<T>() where T : class;
/// <summary>
/// Checks if a merge configuration exists for the specified entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>True if configuration exists.</returns>
bool HasConfiguration<T>() where T : class;
}
@@ -0,0 +1,15 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Interface for post-processing operations after data sync.
/// </summary>
public interface IPostProcessor
{
/// <summary>
/// Executes post-processing logic after data has been synced to the table.
/// </summary>
/// <param name="tableName">The table that was synced.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task ProcessAsync(string tableName, CancellationToken cancellationToken = default);
}
@@ -0,0 +1,16 @@
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Checks schedules and determines which sync tasks are pending.
/// </summary>
public interface IScheduleChecker
{
/// <summary>
/// Gets list of pending data update tasks based on current schedules.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>List of pending tasks to execute.</returns>
Task<List<DataUpdateTask>> GetPendingTasksAsync(CancellationToken cancellationToken = default);
}
@@ -0,0 +1,36 @@
using System.Data;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Validates data against database table schema.
/// </summary>
public interface ISchemaValidator
{
/// <summary>
/// Gets the schema for a table.
/// </summary>
/// <param name="connection">Database connection.</param>
/// <param name="tableName">Table name.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>List of column schemas.</returns>
Task<IReadOnlyList<ColumnSchema>> GetTableSchemaAsync(
IDbConnection connection,
string tableName,
CancellationToken cancellationToken = default);
/// <summary>
/// Validates a batch of data against the table schema.
/// </summary>
/// <typeparam name="T">Entity type.</typeparam>
/// <param name="data">Data to validate.</param>
/// <param name="schema">Column schemas to validate against.</param>
/// <param name="maxErrors">Maximum number of errors to collect before stopping (0 = unlimited).</param>
/// <returns>List of validation errors.</returns>
IReadOnlyList<ValidationError> ValidateBatch<T>(
IReadOnlyList<T> data,
IReadOnlyList<ColumnSchema> schema,
int maxErrors = 100) where T : class;
}
@@ -0,0 +1,14 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Orchestrates parallel execution of data sync operations.
/// </summary>
public interface ISyncOrchestrator
{
/// <summary>
/// Executes all pending sync operations in parallel.
/// </summary>
/// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
/// <returns>A task representing the async operation.</returns>
Task ExecutePendingSyncsAsync(CancellationToken cancellationToken = default);
}
@@ -0,0 +1,17 @@
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Executes a single table sync operation.
/// </summary>
public interface ITableSyncOperation
{
/// <summary>
/// Executes the sync operation for a single table.
/// </summary>
/// <param name="task">The data update task to execute.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task representing the async operation.</returns>
Task ExecuteAsync(DataUpdateTask task, CancellationToken cancellationToken = default);
}
@@ -0,0 +1,160 @@
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync;
/// <summary>
/// Background service that orchestrates data synchronization from JDE/CMS to SQL Server cache.
/// </summary>
public class DataSyncService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<DataSyncService> _logger;
private readonly DataSyncMetrics _metrics;
private DateTime _lastPurgeCheck = DateTime.MinValue;
private readonly TimeSpan _purgeCheckInterval = TimeSpan.FromHours(24);
/// <summary>
/// Initializes a new instance of the <see cref="DataSyncService"/> class.
/// </summary>
public DataSyncService(
IServiceScopeFactory scopeFactory,
IOptions<DataSyncOptions> options,
ILogger<DataSyncService> logger,
DataSyncMetrics metrics)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.Enabled)
{
_logger.LogInformation("DataSyncService is disabled via configuration");
return;
}
_logger.LogInformation(
"DataSyncService starting with CheckInterval={CheckInterval}, MaxDegreeOfParallelism={MaxDegreeOfParallelism}",
_options.Value.CheckInterval,
_options.Value.MaxDegreeOfParallelism);
// Startup: close any interrupted syncs from prior runs
await CloseOpenUpdateEntriesAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Create scope for this sync cycle
await using var scope = _scopeFactory.CreateAsyncScope();
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
// Check schedules and execute pending syncs
await orchestrator.ExecutePendingSyncsAsync(stoppingToken);
// Periodic purge of old DataUpdate records
await PurgeUpdateEntriesAsync(scope, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Graceful shutdown
_logger.LogInformation("DataSyncService stopping gracefully");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in sync cycle");
_metrics.RecordCycleError();
}
// Wait before next check
try
{
await Task.Delay(_options.Value.CheckInterval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
_logger.LogInformation("DataSyncService stopped");
}
/// <summary>
/// Closes any open update entries from interrupted prior runs.
/// </summary>
private async Task CloseOpenUpdateEntriesAsync(CancellationToken cancellationToken)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var closedCount = await repository.CloseOpenUpdateEntriesAsync(cancellationToken);
if (closedCount > 0)
{
_logger.LogWarning(
"Closed {Count} interrupted update entries from prior runs",
closedCount);
}
else
{
_logger.LogDebug("No interrupted update entries found");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close open update entries at startup");
// Continue starting - this is not fatal
}
}
/// <summary>
/// Purges old DataUpdate records periodically.
/// </summary>
private async Task PurgeUpdateEntriesAsync(AsyncServiceScope scope, CancellationToken cancellationToken)
{
if (DateTime.UtcNow - _lastPurgeCheck < _purgeCheckInterval)
{
return;
}
_lastPurgeCheck = DateTime.UtcNow;
try
{
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var purgedCount = await repository.PurgeOldEntriesAsync(
_options.Value.PurgeRetentionDays,
cancellationToken);
if (purgedCount > 0)
{
_logger.LogInformation(
"Purged {Count} DataUpdate records older than {Days} days",
purgedCount,
_options.Value.PurgeRetentionDays);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to purge old update entries");
// Continue - this is not fatal
}
}
}
@@ -0,0 +1,106 @@
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.Core.Models.Organization;
using JdeScoping.Core.Models.Quality;
using JdeScoping.Core.Models.WorkOrders;
using JdeScoping.DataSync;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Configuration.MergeConfigurations;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Fetchers.Cms;
using JdeScoping.DataSync.Fetchers.Jde;
using JdeScoping.DataSync.Generated;
using JdeScoping.DataSync.HealthChecks;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.Configuration;
namespace Microsoft.Extensions.DependencyInjection;
/// <summary>
/// Extension methods for registering data sync services.
/// </summary>
public static class DataSyncDependencyInjection
{
/// <summary>
/// Adds data synchronization services to the service collection.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="configuration">The configuration.</param>
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddDataSyncServices(
this IServiceCollection services,
IConfiguration configuration)
{
// Bind configuration with validation
services.AddOptions<DataSyncOptions>()
.Bind(configuration.GetSection(DataSyncOptions.SectionName))
.ValidateDataAnnotations()
.ValidateOnStart();
// Register hosted service
services.AddHostedService<DataSyncService>();
// Register core services as scoped (for parallel isolation)
services.AddScoped<ISyncOrchestrator, SyncOrchestrator>();
services.AddScoped<IScheduleChecker, ScheduleChecker>();
services.AddScoped<ITableSyncOperation, TableSyncOperation>();
services.AddScoped<IDataUpdateRepository, DataUpdateRepository>();
// Register bulk merge services
services.AddSingleton<IDataReaderFactory, DataReaderFactory>();
services.AddSingleton<ISchemaValidator, SchemaValidator>();
services.AddScoped<IBulkMergeHelper, BulkMergeHelper>();
// Register merge configuration registry
services.AddSingleton<IMergeConfigurationRegistry, MergeConfigurationRegistry>();
// Register merge configurations - explicit registration per entity
services.AddSingleton<IMergeConfiguration<WorkOrder>, WorkOrderMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<Lot>, LotMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<LotUsage>, LotUsageMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<Item>, ItemMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<WorkCenter>, WorkCenterMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<ProfitCenter>, ProfitCenterMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<JdeUser>, JdeUserMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<Branch>, BranchMergeConfiguration>();
services.AddSingleton<IMergeConfiguration<MisData>, MisDataMergeConfiguration>();
// Register health check
services.AddHealthChecks()
.AddCheck<DataSyncHealthCheck>("data-sync", tags: ["datasync", "background"]);
// Register metrics as singleton
services.AddSingleton<DataSyncMetrics>();
// Register JDE fetchers
services.AddScoped<IDataFetcher<WorkOrder>, JdeWorkOrderFetcher>();
services.AddScoped<IDataFetcher<LotUsage>, JdeLotUsageFetcher>();
services.AddScoped<IDataFetcher<Item>, JdeItemFetcher>();
services.AddScoped<IDataFetcher<Lot>, JdeLotFetcher>();
services.AddScoped<IDataFetcher<WorkCenter>, JdeWorkCenterFetcher>();
services.AddScoped<IDataFetcher<ProfitCenter>, JdeProfitCenterFetcher>();
services.AddScoped<IDataFetcher<JdeUser>, JdeUserFetcher>();
services.AddScoped<IDataFetcher<Branch>, JdeBranchFetcher>();
// Register CMS fetchers
services.AddScoped<IDataFetcher<MisData>, CmsMisDataFetcher>();
// Register post processors
services.AddScoped<IPostProcessor, MisDataPostProcessor>();
services.AddScoped<MisDataPostProcessor>();
// Register fetchers by name for dynamic resolution
services.AddScoped<JdeWorkOrderFetcher>();
services.AddScoped<JdeLotUsageFetcher>();
services.AddScoped<JdeItemFetcher>();
services.AddScoped<JdeLotFetcher>();
services.AddScoped<JdeWorkCenterFetcher>();
services.AddScoped<JdeProfitCenterFetcher>();
services.AddScoped<JdeUserFetcher>();
services.AddScoped<JdeBranchFetcher>();
services.AddScoped<CmsMisDataFetcher>();
return services;
}
}
@@ -0,0 +1,80 @@
namespace JdeScoping.DataSync.Exceptions;
/// <summary>
/// Exception thrown when a bulk merge operation fails.
/// </summary>
public class BulkMergeException : Exception
{
/// <summary>
/// The destination table name.
/// </summary>
public string TableName { get; init; } = string.Empty;
/// <summary>
/// The batch number that failed (1-based).
/// </summary>
public int BatchNumber { get; init; }
/// <summary>
/// Number of rows in the failed batch.
/// </summary>
public int RowsInBatch { get; init; }
/// <summary>
/// The SQL statement that was being executed when the error occurred.
/// </summary>
public string? SqlStatement { get; init; }
public BulkMergeException()
{
}
public BulkMergeException(string message) : base(message)
{
}
public BulkMergeException(string message, Exception innerException) : base(message, innerException)
{
}
}
/// <summary>
/// Exception thrown when validation fails before bulk copy.
/// </summary>
public class BulkMergeValidationException : BulkMergeException
{
/// <summary>
/// The validation errors that were found.
/// </summary>
public IReadOnlyList<ValidationError> Errors { get; init; } = [];
public BulkMergeValidationException()
{
}
public BulkMergeValidationException(string message) : base(message)
{
}
public BulkMergeValidationException(string message, IReadOnlyList<ValidationError> errors) : base(message)
{
Errors = errors;
}
public BulkMergeValidationException(string message, Exception innerException) : base(message, innerException)
{
}
}
/// <summary>
/// Represents a validation error for a specific row and column.
/// </summary>
/// <param name="RowIndex">Zero-based row index in the batch.</param>
/// <param name="ColumnName">The column name that failed validation.</param>
/// <param name="Value">The value that failed validation.</param>
/// <param name="Message">Human-readable error message.</param>
public record ValidationError(
int RowIndex,
string ColumnName,
object? Value,
string Message);
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Quality;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Cms;
/// <summary>
/// Fetches MIS data from CMS (Sybase/Oracle).
/// </summary>
public class CmsMisDataFetcher : IDataFetcher<MisData>
{
private readonly ICmsDataSource _cmsDataSource;
private readonly ILogger<CmsMisDataFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="CmsMisDataFetcher"/> class.
/// </summary>
public CmsMisDataFetcher(
ICmsDataSource cmsDataSource,
ILogger<CmsMisDataFetcher> logger)
{
_cmsDataSource = cmsDataSource ?? throw new ArgumentNullException(nameof(cmsDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<MisData> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching MIS data from CMS, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var misData in _cmsDataSource.GetMisDataAsync(minimumDt, cancellationToken))
{
yield return misData;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches branch data from JDE.
/// </summary>
public class JdeBranchFetcher : IDataFetcher<Branch>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeBranchFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeBranchFetcher"/> class.
/// </summary>
public JdeBranchFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeBranchFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Branch> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching branches from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var branch in _jdeDataSource.GetBranchesAsync(minimumDt, cancellationToken))
{
yield return branch;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches item master data from JDE.
/// </summary>
public class JdeItemFetcher : IDataFetcher<Item>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeItemFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeItemFetcher"/> class.
/// </summary>
public JdeItemFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeItemFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Item> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching items from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var item in _jdeDataSource.GetItemsAsync(minimumDt, cancellationToken))
{
yield return item;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches lot master data from JDE.
/// </summary>
public class JdeLotFetcher : IDataFetcher<Lot>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeLotFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeLotFetcher"/> class.
/// </summary>
public JdeLotFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeLotFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Lot> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching lots from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var lot in _jdeDataSource.GetLotsAsync(minimumDt, cancellationToken))
{
yield return lot;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches lot usage (cardex) data from JDE.
/// </summary>
public class JdeLotUsageFetcher : IDataFetcher<LotUsage>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeLotUsageFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeLotUsageFetcher"/> class.
/// </summary>
public JdeLotUsageFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeLotUsageFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<LotUsage> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching lot usages from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var lotUsage in _jdeDataSource.GetLotUsagesAsync(minimumDt, cancellationToken))
{
yield return lotUsage;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches profit center data from JDE.
/// </summary>
public class JdeProfitCenterFetcher : IDataFetcher<ProfitCenter>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeProfitCenterFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeProfitCenterFetcher"/> class.
/// </summary>
public JdeProfitCenterFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeProfitCenterFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<ProfitCenter> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching profit centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var profitCenter in _jdeDataSource.GetProfitCentersAsync(minimumDt, cancellationToken))
{
yield return profitCenter;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches JDE user data from JDE.
/// </summary>
public class JdeUserFetcher : IDataFetcher<JdeUser>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeUserFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeUserFetcher"/> class.
/// </summary>
public JdeUserFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeUserFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<JdeUser> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching JDE users, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var user in _jdeDataSource.GetUsersAsync(minimumDt, cancellationToken))
{
yield return user;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches work center data from JDE.
/// </summary>
public class JdeWorkCenterFetcher : IDataFetcher<WorkCenter>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeWorkCenterFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeWorkCenterFetcher"/> class.
/// </summary>
public JdeWorkCenterFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeWorkCenterFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<WorkCenter> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching work centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var workCenter in _jdeDataSource.GetWorkCentersAsync(minimumDt, cancellationToken))
{
yield return workCenter;
}
}
}
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.WorkOrders;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches work order data from JDE.
/// </summary>
public class JdeWorkOrderFetcher : IDataFetcher<WorkOrder>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeWorkOrderFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeWorkOrderFetcher"/> class.
/// </summary>
public JdeWorkOrderFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeWorkOrderFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<WorkOrder> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching work orders from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var workOrder in _jdeDataSource.GetWorkOrdersAsync(minimumDt, cancellationToken))
{
yield return workOrder;
}
}
}
@@ -0,0 +1,40 @@
using System.Runtime.CompilerServices;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers;
/// <summary>
/// Base class for mock data fetchers used during development.
/// Returns empty results - real implementations will query source databases.
/// </summary>
/// <typeparam name="TEntity">The entity type being fetched.</typeparam>
public abstract class MockDataFetcher<TEntity> : IDataFetcher<TEntity> where TEntity : class
{
/// <summary>
/// Logger instance.
/// </summary>
protected readonly ILogger Logger;
/// <summary>
/// Initializes a new instance of the <see cref="MockDataFetcher{TEntity}"/> class.
/// </summary>
protected MockDataFetcher(ILogger logger)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public virtual async IAsyncEnumerable<TEntity> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Logger.LogWarning(
"MockDataFetcher<{EntityType}> returning empty results. Implement real fetcher for production.",
typeof(TEntity).Name);
// Return empty enumerable - real implementations will query source databases
await Task.CompletedTask;
yield break;
}
}
@@ -0,0 +1,86 @@
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace JdeScoping.DataSync.HealthChecks;
/// <summary>
/// Health check for data sync operations.
/// </summary>
public class DataSyncHealthCheck : IHealthCheck
{
private readonly IDataUpdateRepository _repository;
/// <summary>
/// Initializes a new instance of the <see cref="DataSyncHealthCheck"/> class.
/// </summary>
public DataSyncHealthCheck(IDataUpdateRepository repository)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
}
/// <inheritdoc/>
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var statuses = await _repository.GetSyncStatusAsync(cancellationToken);
var data = new Dictionary<string, object>();
foreach (var status in statuses)
{
var key = $"{status.TableName}_{status.UpdateType}";
data[$"{key}_LastSync"] = status.LastSyncTime?.ToString("O") ?? "Never";
data[$"{key}_Status"] = status.IsOverdue ? "Overdue" : "Current";
data[$"{key}_RecentFailures"] = status.RecentFailures;
}
var overdueCount = statuses.Count(s => s.IsOverdue);
var failedCount = statuses.Count(s => s.RecentFailures > 0);
var totalTables = statuses.Select(s => s.TableName).Distinct().Count();
data["TotalTables"] = totalTables;
data["OverdueCount"] = overdueCount;
data["FailedCount"] = failedCount;
if (failedCount > 2)
{
return HealthCheckResult.Unhealthy(
$"Multiple recent sync failures ({failedCount} tables)",
data: data);
}
if (failedCount > 0)
{
return HealthCheckResult.Degraded(
$"{failedCount} tables with recent failures",
data: data);
}
if (overdueCount > totalTables / 2)
{
return HealthCheckResult.Degraded(
$"{overdueCount} tables overdue for sync",
data: data);
}
if (overdueCount > 0)
{
return HealthCheckResult.Healthy(
$"All syncs progressing ({overdueCount} tables slightly overdue)",
data: data);
}
return HealthCheckResult.Healthy(
"All syncs current",
data: data);
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(
"Unable to check sync status",
exception: ex);
}
}
}
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="JdeScoping.DataSync.Tests" />
<InternalsVisibleTo Include="JdeScoping.DataSync.IntegrationTests" />
<InternalsVisibleTo Include="DynamicProxyGenAssembly2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JdeScoping.Core\JdeScoping.Core.csproj" />
<ProjectReference Include="..\JdeScoping.DataAccess\JdeScoping.DataAccess.csproj" />
<ProjectReference Include="..\JdeScoping.DataSync.SourceGenerators\JdeScoping.DataSync.SourceGenerators.csproj"
OutputItemType="Analyzer"
ReferenceOutputAssembly="false" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="10.0.1" />
</ItemGroup>
</Project>
@@ -0,0 +1,20 @@
namespace JdeScoping.DataSync.Models;
/// <summary>
/// Represents the schema of a database column.
/// </summary>
/// <param name="Name">Column name.</param>
/// <param name="DataType">SQL Server data type name (e.g., "nvarchar", "int", "decimal").</param>
/// <param name="MaxLength">Maximum length for string columns (-1 for MAX).</param>
/// <param name="Precision">Precision for decimal columns.</param>
/// <param name="Scale">Scale for decimal columns.</param>
/// <param name="IsNullable">Whether the column allows null values.</param>
/// <param name="OrdinalPosition">Column ordinal position (1-based).</param>
public record ColumnSchema(
string Name,
string DataType,
int? MaxLength,
int? Precision,
int? Scale,
bool IsNullable,
int OrdinalPosition);
@@ -0,0 +1,62 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
namespace JdeScoping.DataSync.Models;
/// <summary>
/// Represents a pending data update task to be executed.
/// </summary>
public class DataUpdateTask
{
/// <summary>
/// Unique identifier for this operation (used for staging table suffixes).
/// </summary>
public Guid OperationId { get; init; } = Guid.NewGuid();
/// <summary>
/// Target table name in SQL Server cache.
/// </summary>
public required string TableName { get; init; }
/// <summary>
/// Source system: "JDE" or "CMS".
/// </summary>
public required string SourceSystem { get; init; }
/// <summary>
/// Source data identifier.
/// </summary>
public string SourceData { get; init; } = string.Empty;
/// <summary>
/// Type of update (Hourly, Daily, Mass).
/// </summary>
public UpdateTypes UpdateType { get; init; }
/// <summary>
/// For incremental updates, the minimum datetime to fetch records from.
/// Null for mass updates (full refresh).
/// </summary>
public DateTime? MinimumDt { get; init; }
/// <summary>
/// The data source configuration for this task.
/// </summary>
public required DataSourceConfig Config { get; init; }
/// <summary>
/// Gets a unique key for logging purposes.
/// </summary>
public string LogKey => $"{TableName}_{UpdateType}_{OperationId:N}";
/// <summary>
/// Gets the schedule configuration for this update type.
/// </summary>
public ScheduleConfig ScheduleConfig => UpdateType switch
{
UpdateTypes.Mass => Config.MassConfig,
UpdateTypes.Daily => Config.DailyConfig,
UpdateTypes.Hourly => Config.HourlyConfig,
_ => throw new ArgumentOutOfRangeException(nameof(UpdateType))
};
}
@@ -0,0 +1,33 @@
namespace JdeScoping.DataSync.Models;
/// <summary>
/// Result of a bulk merge operation.
/// </summary>
/// <param name="TotalRowsProcessed">Total number of rows processed from source.</param>
/// <param name="RowsInserted">Number of rows inserted (new records).</param>
/// <param name="RowsUpdated">Number of rows updated (existing records).</param>
/// <param name="BatchCount">Number of batches processed.</param>
/// <param name="Elapsed">Total elapsed time for the operation.</param>
public record MergeResult(
int TotalRowsProcessed,
int RowsInserted,
int RowsUpdated,
int BatchCount,
TimeSpan Elapsed)
{
/// <summary>
/// Total rows affected (inserted + updated).
/// </summary>
public int TotalRowsAffected => RowsInserted + RowsUpdated;
}
/// <summary>
/// Result of a mass insert operation.
/// </summary>
/// <param name="TotalRowsInserted">Total rows inserted.</param>
/// <param name="Elapsed">Total elapsed time.</param>
/// <param name="IndexesRebuilt">Whether indexes were rebuilt (vs just re-enabled).</param>
public record MassInsertResult(
long TotalRowsInserted,
TimeSpan Elapsed,
bool IndexesRebuilt);
@@ -0,0 +1,432 @@
using System.Diagnostics;
using System.Linq.Expressions;
using Dapper;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Implements bulk merge operations using temp tables and SQL MERGE statements.
/// </summary>
internal sealed class BulkMergeHelper : IBulkMergeHelper
{
private const int DefaultBatchSize = 10000;
private readonly IDbConnectionFactory _connectionFactory;
private readonly IDataReaderFactory _dataReaderFactory;
private readonly ISchemaValidator _schemaValidator;
private readonly ILogger<BulkMergeHelper> _logger;
public BulkMergeHelper(
IDbConnectionFactory connectionFactory,
IDataReaderFactory dataReaderFactory,
ISchemaValidator schemaValidator,
ILogger<BulkMergeHelper> logger)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_dataReaderFactory = dataReaderFactory ?? throw new ArgumentNullException(nameof(dataReaderFactory));
_schemaValidator = schemaValidator ?? throw new ArgumentNullException(nameof(schemaValidator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<MergeResult> MergeAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
Expression<Func<T, object>> matchOn,
Expression<Func<T, object>>? updateColumns = null,
Expression<Func<T, T, bool>>? updateWhen = null,
Expression<Func<T, object>>? insertColumns = null,
string? tempTableName = null,
int batchSize = 0,
bool validateBeforeCopy = false,
CancellationToken cancellationToken = default) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentNullException.ThrowIfNull(matchOn);
var stopwatch = Stopwatch.StartNew();
var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
var effectiveTempTable = tempTableName ?? $"#TEMP_{destinationTable.Replace(".", "_").Replace("[", "").Replace("]", "")}";
// Parse expressions to get column names
var matchColumnNames = ExpressionParser.GetColumnNames(matchOn);
var allColumns = _dataReaderFactory.GetColumnNames<T>();
// Default update columns = all columns except match columns
var updateColumnNames = updateColumns != null
? ExpressionParser.GetColumnNames(updateColumns)
: allColumns.Where(c => !matchColumnNames.Contains(c, StringComparer.OrdinalIgnoreCase)).ToList();
// Default insert columns = all columns
var insertColumnNames = insertColumns != null
? ExpressionParser.GetColumnNames(insertColumns)
: allColumns;
// Build the updateWhen SQL clause if provided
var updateWhenSql = ExpressionParser.BuildUpdateWhenSql(updateWhen);
_logger.LogDebug(
"Starting bulk merge to {DestinationTable}. BatchSize={BatchSize}, MatchColumns={MatchColumns}, TempTable={TempTable}",
destinationTable, effectiveBatchSize, string.Join(",", matchColumnNames), effectiveTempTable);
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
// Get schema for validation if needed
IReadOnlyList<ColumnSchema>? schema = null;
if (validateBeforeCopy)
{
schema = await _schemaValidator.GetTableSchemaAsync(connection, destinationTable, cancellationToken);
}
int totalRows = 0;
int totalInserted = 0;
int totalUpdated = 0;
int batchCount = 0;
bool tempTableCreated = false;
try
{
// Create temp table with same schema as destination
await CreateTempTableAsync(connection, effectiveTempTable, destinationTable, cancellationToken);
tempTableCreated = true;
// Build MERGE SQL once
var mergeSql = MergeSqlBuilder.BuildMergeSimple(
destinationTable,
effectiveTempTable,
matchColumnNames,
updateColumnNames,
updateWhenSql,
insertColumnNames);
// Process data in batches
var batch = new List<T>(effectiveBatchSize);
await foreach (var item in data.WithCancellation(cancellationToken))
{
batch.Add(item);
if (batch.Count >= effectiveBatchSize)
{
batchCount++;
var (inserted, updated) = await ProcessBatchAsync(
connection, batch, effectiveTempTable, destinationTable,
mergeSql, schema, batchCount, cancellationToken);
totalRows += batch.Count;
totalInserted += inserted;
totalUpdated += updated;
batch.Clear();
}
}
// Process remaining items
if (batch.Count > 0)
{
batchCount++;
var (inserted, updated) = await ProcessBatchAsync(
connection, batch, effectiveTempTable, destinationTable,
mergeSql, schema, batchCount, cancellationToken);
totalRows += batch.Count;
totalInserted += inserted;
totalUpdated += updated;
}
stopwatch.Stop();
_logger.LogInformation(
"Bulk merge to {DestinationTable} completed. TotalRows={TotalRows}, Inserted={Inserted}, Updated={Updated}, Batches={Batches}, Elapsed={Elapsed}ms",
destinationTable, totalRows, totalInserted, totalUpdated, batchCount, stopwatch.ElapsedMilliseconds);
return new MergeResult(totalRows, totalInserted, totalUpdated, batchCount, stopwatch.Elapsed);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Bulk merge to {DestinationTable} failed after {Batches} batches and {TotalRows} rows",
destinationTable, batchCount, totalRows);
throw;
}
finally
{
// Always try to drop the temp table
if (tempTableCreated)
{
try
{
await DropTempTableAsync(connection, effectiveTempTable, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to drop temp table {TempTable}", effectiveTempTable);
}
}
}
}
private async Task<(int Inserted, int Updated)> ProcessBatchAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string tempTableName,
string destinationTable,
string mergeSql,
IReadOnlyList<ColumnSchema>? schema,
int batchNumber,
CancellationToken cancellationToken) where T : class
{
try
{
// Validate if schema available
if (schema != null)
{
var errors = _schemaValidator.ValidateBatch(batch, schema);
if (errors.Count > 0)
{
throw new BulkMergeValidationException(
$"Validation failed for batch {batchNumber} with {errors.Count} errors.",
errors)
{
TableName = destinationTable,
BatchNumber = batchNumber,
RowsInBatch = batch.Count
};
}
}
// Bulk copy to temp table
await BulkCopyAsync(connection, batch, tempTableName, cancellationToken);
// Execute MERGE
var rowsAffected = await connection.ExecuteAsync(
new CommandDefinition(mergeSql, cancellationToken: cancellationToken));
// Truncate temp table for next batch
await connection.ExecuteAsync(
new CommandDefinition(
MergeSqlBuilder.BuildTruncateTempTable(tempTableName),
cancellationToken: cancellationToken));
_logger.LogDebug(
"Batch {BatchNumber} completed. Rows={RowCount}, Affected={RowsAffected}",
batchNumber, batch.Count, rowsAffected);
// Note: The simple MERGE doesn't track insert vs update counts
// We return affected rows as inserts for now - could enhance MERGE to track this
return (rowsAffected, 0);
}
catch (SqlException ex)
{
throw new BulkMergeException(
$"SQL error during batch {batchNumber}: {ex.Message}", ex)
{
TableName = destinationTable,
BatchNumber = batchNumber,
RowsInBatch = batch.Count,
SqlStatement = mergeSql
};
}
}
private async Task BulkCopyAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string tempTableName,
CancellationToken cancellationToken) where T : class
{
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = tempTableName,
BatchSize = batch.Count
};
// Create reader from the batch list
using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable());
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
private static async Task CreateTempTableAsync(
SqlConnection connection,
string tempTableName,
string sourceTableName,
CancellationToken cancellationToken)
{
var sql = MergeSqlBuilder.BuildCreateTempTable(tempTableName, sourceTableName);
await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken));
}
private static async Task DropTempTableAsync(
SqlConnection connection,
string tempTableName,
CancellationToken cancellationToken)
{
var sql = MergeSqlBuilder.BuildDropTempTable(tempTableName);
await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken));
}
/// <inheritdoc />
public async Task<MassInsertResult> MassInsertAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
bool rebuildIndexes = true,
int batchSize = 0,
CancellationToken cancellationToken = default) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
var stopwatch = Stopwatch.StartNew();
var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_logger.LogInformation(
"Starting mass insert to {DestinationTable}. BatchSize={BatchSize}, RebuildIndexes={RebuildIndexes}",
destinationTable, effectiveBatchSize, rebuildIndexes);
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
long totalRows = 0;
try
{
// Step 1: Disable non-clustered indexes
await DisableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken);
// Step 2: Truncate the table
await connection.ExecuteAsync(
new CommandDefinition($"TRUNCATE TABLE [{destinationTable}]", cancellationToken: cancellationToken));
_logger.LogDebug("Truncated table {DestinationTable}", destinationTable);
// Step 3: Bulk copy data in batches
var batch = new List<T>(effectiveBatchSize);
await foreach (var item in data.WithCancellation(cancellationToken))
{
batch.Add(item);
if (batch.Count >= effectiveBatchSize)
{
await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken);
totalRows += batch.Count;
_logger.LogDebug("Inserted batch of {Count} rows to {Table}", batch.Count, destinationTable);
batch.Clear();
}
}
// Insert remaining rows
if (batch.Count > 0)
{
await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken);
totalRows += batch.Count;
_logger.LogDebug("Inserted final batch of {Count} rows to {Table}", batch.Count, destinationTable);
}
// Step 4: Rebuild or re-enable indexes
if (rebuildIndexes)
{
await RebuildIndexesAsync(connection, destinationTable, cancellationToken);
}
else
{
await EnableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken);
}
stopwatch.Stop();
_logger.LogInformation(
"Mass insert to {DestinationTable} completed. TotalRows={TotalRows}, Elapsed={Elapsed}ms",
destinationTable, totalRows, stopwatch.ElapsedMilliseconds);
return new MassInsertResult(totalRows, stopwatch.Elapsed, rebuildIndexes);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Mass insert to {DestinationTable} failed after {TotalRows} rows",
destinationTable, totalRows);
// Try to re-enable indexes even on failure
try
{
await EnableNonClusteredIndexesAsync(connection, destinationTable, CancellationToken.None);
}
catch (Exception indexEx)
{
_logger.LogWarning(indexEx, "Failed to re-enable indexes on {Table} after error", destinationTable);
}
throw;
}
}
private async Task BulkCopyDirectAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string destinationTable,
CancellationToken cancellationToken) where T : class
{
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = $"[{destinationTable}]",
BatchSize = batch.Count,
BulkCopyTimeout = 3600
};
using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable());
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
private static async Task DisableNonClusteredIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $@"
DECLARE @sql NVARCHAR(MAX) = '';
SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] DISABLE;' + CHAR(13)
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = @tableName
AND i.type = 2
AND i.is_disabled = 0;
EXEC sp_executesql @sql;";
await connection.ExecuteAsync(
new CommandDefinition(sql, new { tableName }, commandTimeout: 300, cancellationToken: cancellationToken));
}
private static async Task EnableNonClusteredIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $@"
DECLARE @sql NVARCHAR(MAX) = '';
SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] REBUILD;' + CHAR(13)
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = @tableName
AND i.type = 2
AND i.is_disabled = 1;
EXEC sp_executesql @sql;";
await connection.ExecuteAsync(
new CommandDefinition(sql, new { tableName }, commandTimeout: 3600, cancellationToken: cancellationToken));
}
private static async Task RebuildIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $"ALTER INDEX ALL ON [{tableName}] REBUILD WITH (FILLFACTOR = 95)";
await connection.ExecuteAsync(
new CommandDefinition(sql, commandTimeout: 3600, cancellationToken: cancellationToken));
}
}
@@ -0,0 +1,221 @@
using Dapper;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Infrastructure;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Repository for DataUpdate operations.
/// </summary>
public class DataUpdateRepository : IDataUpdateRepository
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<DataUpdateRepository> _logger;
private const int InProgressMarker = -2;
private const int FailedMarker = -1;
/// <summary>
/// Initializes a new instance of the <see cref="DataUpdateRepository"/> class.
/// </summary>
public DataUpdateRepository(
IDbConnectionFactory connectionFactory,
ILogger<DataUpdateRepository> logger)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task<Dictionary<string, DataUpdate>> GetLastDataUpdatesAsync(CancellationToken cancellationToken = default)
{
const string sql = @"
WITH DU_CTE AS (
SELECT du.*,
ROW_NUMBER() OVER (PARTITION BY du.TableName, du.UpdateType ORDER BY du.StartDT DESC) RN
FROM dbo.DataUpdate AS du
WHERE du.WasSuccessful = 1
)
SELECT cte.ID,
cte.SourceSystem,
cte.SourceData,
cte.TableName,
cte.StartDT,
cte.EndDT,
cte.UpdateType,
cte.WasSuccessful,
cte.NumberRecords
FROM DU_CTE cte
WHERE cte.RN = 1";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
var results = await connection.QueryAsync<DataUpdate>(sql, commandTimeout: 120);
return results.ToDictionary(
du => $"{du.TableName}_{(int)du.UpdateType}",
du => du);
}
/// <inheritdoc/>
public async Task<int> StartUpdateAsync(
string sourceSystem,
string sourceData,
string tableName,
UpdateTypes updateType,
CancellationToken cancellationToken = default)
{
const string sql = @"
INSERT INTO dbo.DataUpdate (SourceSystem, SourceData, TableName, UpdateType, StartDT, NumberRecords, WasSuccessful)
OUTPUT INSERTED.ID
VALUES (@sourceSystem, @sourceData, @tableName, @updateType, GETUTCDATE(), @inProgressMarker, 0)";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
var id = await connection.ExecuteScalarAsync<int>(
sql,
new
{
sourceSystem,
sourceData,
tableName,
updateType = (int)updateType,
inProgressMarker = InProgressMarker
},
commandTimeout: 30);
_logger.LogDebug(
"Started DataUpdate {ID} for {Table} ({Type})",
id,
tableName,
updateType);
return id;
}
/// <inheritdoc/>
public async Task CompleteUpdateAsync(
int updateId,
long recordCount,
bool success,
CancellationToken cancellationToken = default)
{
const string sql = @"
UPDATE dbo.DataUpdate
SET EndDT = GETUTCDATE(),
NumberRecords = @recordCount,
WasSuccessful = @success
WHERE ID = @updateId";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
await connection.ExecuteAsync(
sql,
new { updateId, recordCount, success },
commandTimeout: 30);
_logger.LogDebug(
"Completed DataUpdate {ID}: success={Success}, records={Records}",
updateId,
success,
recordCount);
}
/// <inheritdoc/>
public async Task<int> CloseOpenUpdateEntriesAsync(CancellationToken cancellationToken = default)
{
const string sql = @"
UPDATE dbo.DataUpdate
SET EndDT = GETUTCDATE(),
WasSuccessful = 0,
NumberRecords = @failedMarker
WHERE NumberRecords = @inProgressMarker";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
var count = await connection.ExecuteAsync(
sql,
new { inProgressMarker = InProgressMarker, failedMarker = FailedMarker },
commandTimeout: 30);
return count;
}
/// <inheritdoc/>
public async Task<int> PurgeOldEntriesAsync(int retentionDays, CancellationToken cancellationToken = default)
{
const string sql = @"
DELETE FROM dbo.DataUpdate
WHERE StartDT < DATEADD(DAY, -@retentionDays, GETUTCDATE())";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
var count = await connection.ExecuteAsync(
sql,
new { retentionDays },
commandTimeout: 300);
return count;
}
/// <inheritdoc/>
public async Task<List<TableSyncStatus>> GetSyncStatusAsync(CancellationToken cancellationToken = default)
{
const string sql = @"
WITH LastSuccessful AS (
SELECT TableName, UpdateType,
MAX(CASE WHEN WasSuccessful = 1 THEN EndDT END) AS LastSuccessfulSync,
SUM(CASE WHEN WasSuccessful = 0 AND StartDT > DATEADD(HOUR, -24, GETUTCDATE()) THEN 1 ELSE 0 END) AS RecentFailures
FROM dbo.DataUpdate
GROUP BY TableName, UpdateType
)
SELECT TableName, UpdateType, LastSuccessfulSync, RecentFailures
FROM LastSuccessful";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
var results = await connection.QueryAsync<(string TableName, int UpdateType, DateTime? LastSuccessfulSync, int RecentFailures)>(
sql,
commandTimeout: 30);
return results
.Select(r => new TableSyncStatus(
r.TableName,
(UpdateTypes)r.UpdateType,
r.LastSuccessfulSync,
r.LastSuccessfulSync.HasValue,
GetExpectedInterval((UpdateTypes)r.UpdateType),
IsOverdue(r.LastSuccessfulSync, (UpdateTypes)r.UpdateType),
r.RecentFailures))
.ToList();
}
/// <summary>
/// Gets the expected interval in minutes for an update type.
/// </summary>
private static int GetExpectedInterval(UpdateTypes updateType)
{
return updateType switch
{
UpdateTypes.Hourly => 60,
UpdateTypes.Daily => 1440,
UpdateTypes.Mass => 10080,
_ => 0
};
}
/// <summary>
/// Checks if a sync is overdue based on last successful sync time.
/// </summary>
private static bool IsOverdue(DateTime? lastSync, UpdateTypes updateType)
{
if (!lastSync.HasValue)
{
return true;
}
var expectedInterval = GetExpectedInterval(updateType);
var grace = expectedInterval * 0.5; // 50% grace period
var overdueTreshold = DateTime.UtcNow.AddMinutes(-(expectedInterval + grace));
return lastSync.Value < overdueTreshold;
}
}
@@ -0,0 +1,192 @@
using System.Linq.Expressions;
using System.Reflection;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Parses LINQ expressions to extract column names and build SQL clauses.
/// </summary>
internal static class ExpressionParser
{
/// <summary>
/// Extracts column names from an expression like: x => new { x.A, x.B } or x => x.Id
/// </summary>
public static IReadOnlyList<string> GetColumnNames<T>(Expression<Func<T, object>> expression)
{
ArgumentNullException.ThrowIfNull(expression);
var body = expression.Body;
// Handle conversion (boxing for value types)
if (body is UnaryExpression unary && unary.NodeType == ExpressionType.Convert)
{
body = unary.Operand;
}
return body switch
{
// Single property: x => x.Id
MemberExpression member => [GetPropertyName(member)],
// Anonymous type: x => new { x.A, x.B }
NewExpression newExpr => ExtractFromNewExpression(newExpr),
// Member init (shouldn't typically happen but handle it)
MemberInitExpression memberInit => ExtractFromNewExpression(memberInit.NewExpression),
_ => throw new ArgumentException(
$"Unsupported expression type: {body.NodeType}. " +
"Use either a single property (x => x.Id) or an anonymous type (x => new {{ x.A, x.B }}).",
nameof(expression))
};
}
/// <summary>
/// Builds a SQL condition clause from an updateWhen expression.
/// Example: (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt
/// Returns: "source.[LastUpdateDt] > target.[LastUpdateDt]"
/// </summary>
public static string? BuildUpdateWhenSql<T>(
Expression<Func<T, T, bool>>? expression,
string sourceAlias = "source",
string targetAlias = "target")
{
if (expression == null)
return null;
var sourceParam = expression.Parameters[0].Name!;
var targetParam = expression.Parameters[1].Name!;
return TranslateExpression(expression.Body, sourceParam, sourceAlias, targetParam, targetAlias);
}
private static IReadOnlyList<string> ExtractFromNewExpression(NewExpression newExpr)
{
if (newExpr.Arguments.Count == 0)
{
throw new ArgumentException("Anonymous type expression must contain at least one property.");
}
var columns = new List<string>();
foreach (var arg in newExpr.Arguments)
{
if (arg is MemberExpression member)
{
columns.Add(GetPropertyName(member));
}
else
{
throw new ArgumentException(
$"Each property in the anonymous type must be a direct property access. " +
$"Found: {arg.NodeType}");
}
}
return columns;
}
private static string GetPropertyName(MemberExpression member)
{
// Ensure it's a property access on the parameter
if (member.Expression is not ParameterExpression)
{
throw new ArgumentException(
$"Only direct property access is supported. " +
$"Nested properties like x.Parent.Name are not allowed. Found: {member}");
}
if (member.Member is not PropertyInfo prop)
{
throw new ArgumentException(
$"Expression must access a property, not a field. Found: {member.Member.MemberType}");
}
return prop.Name;
}
private static string TranslateExpression(
Expression expr,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
return expr switch
{
BinaryExpression binary => TranslateBinaryExpression(
binary, sourceParam, sourceAlias, targetParam, targetAlias),
MemberExpression member => TranslateMemberExpression(
member, sourceParam, sourceAlias, targetParam, targetAlias),
ConstantExpression constant => TranslateConstant(constant),
UnaryExpression unary when unary.NodeType == ExpressionType.Not =>
$"NOT ({TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias)})",
UnaryExpression unary when unary.NodeType == ExpressionType.Convert =>
TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias),
_ => throw new NotSupportedException($"Expression type {expr.NodeType} is not supported in updateWhen clause.")
};
}
private static string TranslateBinaryExpression(
BinaryExpression binary,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
var left = TranslateExpression(binary.Left, sourceParam, sourceAlias, targetParam, targetAlias);
var right = TranslateExpression(binary.Right, sourceParam, sourceAlias, targetParam, targetAlias);
var op = binary.NodeType switch
{
ExpressionType.Equal => "=",
ExpressionType.NotEqual => "<>",
ExpressionType.GreaterThan => ">",
ExpressionType.GreaterThanOrEqual => ">=",
ExpressionType.LessThan => "<",
ExpressionType.LessThanOrEqual => "<=",
ExpressionType.AndAlso => "AND",
ExpressionType.OrElse => "OR",
_ => throw new NotSupportedException($"Binary operator {binary.NodeType} is not supported.")
};
// Wrap AND/OR with parentheses for clarity
if (binary.NodeType is ExpressionType.AndAlso or ExpressionType.OrElse)
{
return $"({left} {op} {right})";
}
return $"{left} {op} {right}";
}
private static string TranslateMemberExpression(
MemberExpression member,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
if (member.Expression is not ParameterExpression param)
{
throw new NotSupportedException(
"Only direct property access on source or target parameters is supported.");
}
var alias = param.Name == sourceParam ? sourceAlias : targetAlias;
var propName = member.Member.Name;
return $"{alias}.[{propName}]";
}
private static string TranslateConstant(ConstantExpression constant)
{
if (constant.Value == null)
return "NULL";
return constant.Value switch
{
string s => $"'{s.Replace("'", "''")}'",
bool b => b ? "1" : "0",
DateTime dt => $"'{dt:yyyy-MM-dd HH:mm:ss.fff}'",
_ => constant.Value.ToString() ?? "NULL"
};
}
}
@@ -0,0 +1,29 @@
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Registry implementation that resolves configurations from DI.
/// </summary>
internal sealed class MergeConfigurationRegistry : IMergeConfigurationRegistry
{
private readonly IServiceProvider _serviceProvider;
public MergeConfigurationRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public IMergeConfiguration<T> GetConfiguration<T>() where T : class
{
var config = _serviceProvider.GetService(typeof(IMergeConfiguration<T>)) as IMergeConfiguration<T>;
return config ?? throw new InvalidOperationException(
$"No merge configuration registered for {typeof(T).Name}. " +
$"Register IMergeConfiguration<{typeof(T).Name}> in ServiceCollectionExtensions.");
}
public bool HasConfiguration<T>() where T : class
{
return _serviceProvider.GetService(typeof(IMergeConfiguration<T>)) != null;
}
}
@@ -0,0 +1,199 @@
using System.Text;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Builds SQL statements for bulk merge operations.
/// </summary>
internal static class MergeSqlBuilder
{
/// <summary>
/// Builds a SQL statement to create a temp table with the same schema as the source table.
/// </summary>
public static string BuildCreateTempTable(string tempTableName, string sourceTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentException.ThrowIfNullOrWhiteSpace(sourceTableName);
return $"SELECT TOP 0 * INTO [{tempTableName}] FROM [{sourceTableName}]";
}
/// <summary>
/// Builds a SQL MERGE statement.
/// </summary>
/// <param name="destinationTable">Target table name.</param>
/// <param name="tempTableName">Source temp table name.</param>
/// <param name="matchColumns">Columns to match on (primary key).</param>
/// <param name="updateColumns">Columns to update on match.</param>
/// <param name="updateWhenClause">Optional SQL condition for when to update. Example: "source.[LastUpdateDt] > target.[LastUpdateDt]"</param>
/// <param name="insertColumns">Columns to insert for new records.</param>
/// <returns>The MERGE SQL statement with output counts.</returns>
public static string BuildMerge(
string destinationTable,
string tempTableName,
IReadOnlyList<string> matchColumns,
IReadOnlyList<string> updateColumns,
string? updateWhenClause,
IReadOnlyList<string> insertColumns)
{
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentNullException.ThrowIfNull(matchColumns);
ArgumentNullException.ThrowIfNull(insertColumns);
if (matchColumns.Count == 0)
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
if (insertColumns.Count == 0)
throw new ArgumentException("At least one insert column is required.", nameof(insertColumns));
var sb = new StringBuilder();
// Declare output variable to track insert vs update
sb.AppendLine("DECLARE @InsertCount INT = 0, @UpdateCount INT = 0;");
sb.AppendLine();
// MERGE statement
sb.AppendLine($"MERGE INTO [{destinationTable}] AS target");
sb.AppendLine($"USING [{tempTableName}] AS source");
// ON clause
sb.Append("ON ");
for (int i = 0; i < matchColumns.Count; i++)
{
if (i > 0)
sb.Append(" AND ");
sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]");
}
sb.AppendLine();
// WHEN MATCHED (update)
if (updateColumns != null && updateColumns.Count > 0)
{
sb.Append("WHEN MATCHED");
if (!string.IsNullOrWhiteSpace(updateWhenClause))
{
sb.Append($" AND {updateWhenClause}");
}
sb.AppendLine(" THEN");
sb.Append(" UPDATE SET ");
for (int i = 0; i < updateColumns.Count; i++)
{
if (i > 0)
sb.Append(", ");
sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]");
}
sb.AppendLine();
}
// WHEN NOT MATCHED (insert)
sb.AppendLine("WHEN NOT MATCHED THEN");
sb.Append(" INSERT (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]")));
sb.AppendLine(")");
sb.Append(" VALUES (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]")));
sb.AppendLine(")");
// OUTPUT clause to count inserts and updates
sb.AppendLine("OUTPUT $action INTO @ActionOutput;");
sb.AppendLine();
// Calculate counts
sb.AppendLine("SELECT @InsertCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'INSERT';");
sb.AppendLine("SELECT @UpdateCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'UPDATE';");
sb.AppendLine("SELECT @InsertCount AS InsertCount, @UpdateCount AS UpdateCount;");
return sb.ToString();
}
/// <summary>
/// Builds a simpler MERGE statement that returns just the affected row count.
/// </summary>
public static string BuildMergeSimple(
string destinationTable,
string tempTableName,
IReadOnlyList<string> matchColumns,
IReadOnlyList<string> updateColumns,
string? updateWhenClause,
IReadOnlyList<string> insertColumns)
{
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentNullException.ThrowIfNull(matchColumns);
ArgumentNullException.ThrowIfNull(insertColumns);
if (matchColumns.Count == 0)
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
if (insertColumns.Count == 0)
throw new ArgumentException("At least one insert column is required.", nameof(insertColumns));
var sb = new StringBuilder();
// MERGE statement
sb.AppendLine($"MERGE INTO [{destinationTable}] AS target");
sb.AppendLine($"USING [{tempTableName}] AS source");
// ON clause
sb.Append("ON ");
for (int i = 0; i < matchColumns.Count; i++)
{
if (i > 0)
sb.Append(" AND ");
sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]");
}
sb.AppendLine();
// WHEN MATCHED (update)
if (updateColumns != null && updateColumns.Count > 0)
{
sb.Append("WHEN MATCHED");
if (!string.IsNullOrWhiteSpace(updateWhenClause))
{
sb.Append($" AND {updateWhenClause}");
}
sb.AppendLine(" THEN");
sb.Append(" UPDATE SET ");
for (int i = 0; i < updateColumns.Count; i++)
{
if (i > 0)
sb.Append(", ");
sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]");
}
sb.AppendLine();
}
// WHEN NOT MATCHED (insert)
sb.AppendLine("WHEN NOT MATCHED THEN");
sb.Append(" INSERT (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]")));
sb.AppendLine(")");
sb.Append(" VALUES (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]")));
sb.Append(")");
sb.AppendLine(";");
return sb.ToString();
}
/// <summary>
/// Builds a SQL statement to truncate a temp table.
/// </summary>
public static string BuildTruncateTempTable(string tempTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
return $"TRUNCATE TABLE [{tempTableName}]";
}
/// <summary>
/// Builds a SQL statement to drop a temp table.
/// </summary>
public static string BuildDropTempTable(string tempTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
return $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE [{tempTableName}]";
}
}
@@ -0,0 +1,35 @@
using JdeScoping.Core.Interfaces;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Post-processor for MIS data to set obsolete dates.
/// </summary>
public class MisDataPostProcessor : IPostProcessor
{
private readonly ILotFinderRepository _repository;
private readonly ILogger<MisDataPostProcessor> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="MisDataPostProcessor"/> class.
/// </summary>
public MisDataPostProcessor(
ILotFinderRepository repository,
ILogger<MisDataPostProcessor> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task ProcessAsync(string tableName, CancellationToken cancellationToken = default)
{
_logger.LogDebug("Running MIS data post-processing for {Table}", tableName);
await _repository.PostProcessMisDataAsync(cancellationToken);
_logger.LogDebug("MIS data post-processing completed for {Table}", tableName);
}
}
@@ -0,0 +1,247 @@
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Infrastructure;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Checks schedules and determines which sync tasks need to be executed.
/// </summary>
public class ScheduleChecker : IScheduleChecker
{
private readonly IDataUpdateRepository _repository;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<ScheduleChecker> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="ScheduleChecker"/> class.
/// </summary>
public ScheduleChecker(
IDataUpdateRepository repository,
IOptions<DataSyncOptions> options,
ILogger<ScheduleChecker> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task<List<DataUpdateTask>> GetPendingTasksAsync(CancellationToken cancellationToken = default)
{
var lastUpdates = await _repository.GetLastDataUpdatesAsync(cancellationToken);
var tasks = new List<DataUpdateTask>();
var now = DateTime.UtcNow;
foreach (var config in _options.Value.DataSources.Where(c => c.IsEnabled))
{
var task = CheckConfigSchedule(config, lastUpdates, now);
if (task != null)
{
tasks.Add(task);
}
}
if (tasks.Count > 0)
{
_logger.LogInformation(
"Found {Count} pending sync tasks: {Tasks}",
tasks.Count,
string.Join(", ", tasks.Select(t => $"{t.TableName}({t.UpdateType})")));
}
else
{
_logger.LogDebug("No pending sync tasks found");
}
return tasks;
}
/// <summary>
/// Checks a single data source config and returns a task if sync is needed.
/// Priority order: Mass > Daily > Hourly
/// </summary>
private DataUpdateTask? CheckConfigSchedule(
DataSourceConfig config,
Dictionary<string, DataUpdate> lastUpdates,
DateTime now)
{
// Get last updates for each type
var massKey = GetUpdateKey(config.TableName, UpdateTypes.Mass);
var dailyKey = GetUpdateKey(config.TableName, UpdateTypes.Daily);
var hourlyKey = GetUpdateKey(config.TableName, UpdateTypes.Hourly);
lastUpdates.TryGetValue(massKey, out var lastMass);
lastUpdates.TryGetValue(dailyKey, out var lastDaily);
lastUpdates.TryGetValue(hourlyKey, out var lastHourly);
// Check Mass first (highest priority)
if (config.MassConfig.Enabled && NeedsMassSync(config, lastMass, now))
{
_logger.LogDebug(
"Mass sync needed for {Table}: last={LastSync}, interval={Interval}m",
config.TableName,
lastMass?.EndDt.ToString("o") ?? "never",
config.MassConfig.IntervalMinutes);
return CreateTask(config, UpdateTypes.Mass, null);
}
// Check Daily
if (config.DailyConfig.Enabled && NeedsDailySync(config, lastDaily, lastMass, now))
{
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
_logger.LogDebug(
"Daily sync needed for {Table}: last={LastSync}, interval={Interval}m, minDT={MinDT}",
config.TableName,
lastDaily?.EndDt.ToString("o") ?? "never",
config.DailyConfig.IntervalMinutes,
minimumDt?.ToString("o") ?? "null");
return CreateTask(config, UpdateTypes.Daily, minimumDt);
}
// Check Hourly (uses Daily's last timestamp for MinimumDT calculation, per legacy behavior)
if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now))
{
// Use daily update timestamp for lookback, not hourly
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
_logger.LogDebug(
"Hourly sync needed for {Table}: last={LastSync}, interval={Interval}m, minDT={MinDT}",
config.TableName,
lastHourly?.EndDt.ToString("o") ?? "never",
config.HourlyConfig.IntervalMinutes,
minimumDt?.ToString("o") ?? "null");
return CreateTask(config, UpdateTypes.Hourly, minimumDt);
}
return null;
}
/// <summary>
/// Determines if a mass sync is needed.
/// </summary>
private bool NeedsMassSync(DataSourceConfig config, DataUpdate? lastMass, DateTime now)
{
// Never synced before - need mass sync
if (lastMass == null)
{
return true;
}
// Check if successful last mass sync was within interval
if (!lastMass.WasSuccessful)
{
// Last sync failed - try again
return true;
}
var nextSyncDue = lastMass.EndDt.AddMinutes(config.MassConfig.IntervalMinutes);
return now > nextSyncDue;
}
/// <summary>
/// Determines if a daily sync is needed.
/// </summary>
private bool NeedsDailySync(DataSourceConfig config, DataUpdate? lastDaily, DataUpdate? lastMass, DateTime now)
{
// If no mass sync ever happened, we need mass first
if (lastMass == null)
{
return false;
}
// Never done daily sync
if (lastDaily == null)
{
return true;
}
// Check if successful last daily sync was within interval
if (!lastDaily.WasSuccessful)
{
return true;
}
var nextSyncDue = lastDaily.EndDt.AddMinutes(config.DailyConfig.IntervalMinutes);
return now > nextSyncDue;
}
/// <summary>
/// Determines if an hourly sync is needed.
/// </summary>
private bool NeedsHourlySync(
DataSourceConfig config,
DataUpdate? lastHourly,
DataUpdate? lastDaily,
DataUpdate? lastMass,
DateTime now)
{
// If no mass sync ever happened, we need mass first
if (lastMass == null)
{
return false;
}
// Never done hourly sync
if (lastHourly == null)
{
return true;
}
// Check if successful last hourly sync was within interval
if (!lastHourly.WasSuccessful)
{
return true;
}
var nextSyncDue = lastHourly.EndDt.AddMinutes(config.HourlyConfig.IntervalMinutes);
return now > nextSyncDue;
}
/// <summary>
/// Calculates the MinimumDT for incremental updates using lookback multiplier.
/// </summary>
private DateTime? CalculateMinimumDt(DataUpdate? lastUpdate, int intervalMinutes)
{
if (lastUpdate == null)
{
return null;
}
var lookbackMinutes = _options.Value.LookbackMultiplier * intervalMinutes;
return lastUpdate.EndDt.AddMinutes(-lookbackMinutes);
}
/// <summary>
/// Creates a data update task.
/// </summary>
private static DataUpdateTask CreateTask(DataSourceConfig config, UpdateTypes updateType, DateTime? minimumDt)
{
return new DataUpdateTask
{
TableName = config.TableName,
SourceSystem = config.SourceSystem,
SourceData = config.SourceData,
UpdateType = updateType,
MinimumDt = minimumDt,
Config = config
};
}
/// <summary>
/// Gets the dictionary key for looking up last updates.
/// </summary>
private static string GetUpdateKey(string tableName, UpdateTypes updateType)
{
return $"{tableName}_{(int)updateType}";
}
}
@@ -0,0 +1,223 @@
using System.Data;
using System.Reflection;
using Dapper;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Validates data against database table schema.
/// </summary>
internal sealed class SchemaValidator : ISchemaValidator
{
/// <inheritdoc />
public async Task<IReadOnlyList<ColumnSchema>> GetTableSchemaAsync(
IDbConnection connection,
string tableName,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(connection);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
// Parse schema and table name
var (schemaName, table) = ParseTableName(tableName);
const string sql = """
SELECT
COLUMN_NAME AS Name,
DATA_TYPE AS DataType,
CHARACTER_MAXIMUM_LENGTH AS MaxLength,
NUMERIC_PRECISION AS [Precision],
NUMERIC_SCALE AS Scale,
CASE WHEN IS_NULLABLE = 'YES' THEN 1 ELSE 0 END AS IsNullable,
ORDINAL_POSITION AS OrdinalPosition
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @Schema AND TABLE_NAME = @Table
ORDER BY ORDINAL_POSITION
""";
var columns = await connection.QueryAsync<ColumnSchema>(
new CommandDefinition(
sql,
new { Schema = schemaName, Table = table },
cancellationToken: cancellationToken));
return columns.ToList();
}
/// <inheritdoc />
public IReadOnlyList<ValidationError> ValidateBatch<T>(
IReadOnlyList<T> data,
IReadOnlyList<ColumnSchema> schema,
int maxErrors = 100) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentNullException.ThrowIfNull(schema);
if (data.Count == 0 || schema.Count == 0)
return [];
var errors = new List<ValidationError>();
var schemaLookup = schema.ToDictionary(s => s.Name, StringComparer.OrdinalIgnoreCase);
var properties = typeof(T).GetProperties(BindingFlags.Public | BindingFlags.Instance)
.ToDictionary(p => p.Name, StringComparer.OrdinalIgnoreCase);
for (int rowIndex = 0; rowIndex < data.Count; rowIndex++)
{
var row = data[rowIndex];
if (row == null) continue;
foreach (var column in schema)
{
if (!properties.TryGetValue(column.Name, out var property))
continue;
var value = property.GetValue(row);
var error = ValidateValue(rowIndex, column, value);
if (error != null)
{
errors.Add(error);
if (maxErrors > 0 && errors.Count >= maxErrors)
return errors;
}
}
}
return errors;
}
private static ValidationError? ValidateValue(int rowIndex, ColumnSchema column, object? value)
{
// Check nullability
if (value == null || (value is string s && string.IsNullOrEmpty(s)))
{
if (!column.IsNullable && !IsIdentityOrComputed(column))
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Column '{column.Name}' does not allow null values.");
}
return null;
}
// Check string length
if (column.MaxLength.HasValue && column.MaxLength.Value > 0)
{
var stringValue = value as string ?? value.ToString();
if (stringValue != null && stringValue.Length > column.MaxLength.Value)
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value for '{column.Name}' exceeds maximum length of {column.MaxLength}. Actual length: {stringValue.Length}.");
}
}
// Check decimal precision/scale
if (column.Precision.HasValue && column.Scale.HasValue && IsDecimalType(column.DataType))
{
if (value is decimal decimalValue)
{
var error = ValidateDecimal(rowIndex, column, decimalValue);
if (error != null)
return error;
}
else if (value is double || value is float)
{
// Convert to decimal for validation
try
{
var decVal = Convert.ToDecimal(value);
var error = ValidateDecimal(rowIndex, column, decVal);
if (error != null)
return error;
}
catch
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value for '{column.Name}' cannot be converted to decimal.");
}
}
}
return null;
}
private static ValidationError? ValidateDecimal(int rowIndex, ColumnSchema column, decimal value)
{
// Calculate actual precision and scale
var sqlString = value.ToString(System.Globalization.CultureInfo.InvariantCulture);
var parts = sqlString.TrimStart('-').Split('.');
var integerDigits = parts[0].TrimStart('0').Length;
var decimalDigits = parts.Length > 1 ? parts[1].TrimEnd('0').Length : 0;
// Handle zero case
if (integerDigits == 0 && decimalDigits == 0)
integerDigits = 1;
var totalDigits = integerDigits + decimalDigits;
var maxTotalDigits = column.Precision!.Value;
var maxDecimalDigits = column.Scale!.Value;
var maxIntegerDigits = maxTotalDigits - maxDecimalDigits;
if (integerDigits > maxIntegerDigits)
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value {value} for '{column.Name}' exceeds maximum integer digits. " +
$"Maximum: {maxIntegerDigits}, Actual: {integerDigits}.");
}
// Note: We don't validate decimal digits as SQL Server will truncate/round them
return null;
}
private static bool IsDecimalType(string dataType)
{
return dataType.Equals("decimal", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("numeric", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("money", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("smallmoney", StringComparison.OrdinalIgnoreCase);
}
private static bool IsIdentityOrComputed(ColumnSchema column)
{
// Identity columns and computed columns can be null in the source data
// This is a simple heuristic - actual identity check would require additional metadata
return column.Name.Equals("Id", StringComparison.OrdinalIgnoreCase) ||
column.Name.EndsWith("Id", StringComparison.OrdinalIgnoreCase);
}
private static (string Schema, string Table) ParseTableName(string tableName)
{
// Remove brackets if present
var cleaned = tableName.Replace("[", "").Replace("]", "");
// Check for temp table
if (cleaned.StartsWith('#'))
{
return ("dbo", cleaned);
}
// Split by dot
var parts = cleaned.Split('.');
return parts.Length switch
{
1 => ("dbo", parts[0]),
2 => (parts[0], parts[1]),
_ => throw new ArgumentException($"Invalid table name format: {tableName}")
};
}
}
@@ -0,0 +1,104 @@
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Orchestrates parallel execution of data sync operations.
/// </summary>
public class SyncOrchestrator : ISyncOrchestrator
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IScheduleChecker _scheduleChecker;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<SyncOrchestrator> _logger;
private readonly DataSyncMetrics _metrics;
/// <summary>
/// Initializes a new instance of the <see cref="SyncOrchestrator"/> class.
/// </summary>
public SyncOrchestrator(
IServiceScopeFactory scopeFactory,
IScheduleChecker scheduleChecker,
IOptions<DataSyncOptions> options,
ILogger<SyncOrchestrator> logger,
DataSyncMetrics metrics)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_scheduleChecker = scheduleChecker ?? throw new ArgumentNullException(nameof(scheduleChecker));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
/// <inheritdoc/>
public async Task ExecutePendingSyncsAsync(CancellationToken cancellationToken = default)
{
var pendingTasks = await _scheduleChecker.GetPendingTasksAsync(cancellationToken);
if (pendingTasks.Count == 0)
{
return;
}
_logger.LogInformation(
"Executing {Count} sync tasks with MaxDegreeOfParallelism={MaxDop}",
pendingTasks.Count,
_options.Value.MaxDegreeOfParallelism);
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = _options.Value.MaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
var startTime = DateTime.UtcNow;
var completedCount = 0;
var failedCount = 0;
await Parallel.ForEachAsync(pendingTasks, parallelOptions, async (task, ct) =>
{
// Each task gets its own scope for proper isolation
await using var scope = _scopeFactory.CreateAsyncScope();
try
{
var operation = scope.ServiceProvider.GetRequiredService<ITableSyncOperation>();
await operation.ExecuteAsync(task, ct);
Interlocked.Increment(ref completedCount);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
_logger.LogWarning(
"Sync operation for {Table} ({Type}) was cancelled",
task.TableName,
task.UpdateType);
throw;
}
catch (Exception ex)
{
Interlocked.Increment(ref failedCount);
_logger.LogError(
ex,
"Sync operation for {Table} ({Type}) failed",
task.TableName,
task.UpdateType);
// Don't rethrow - let other tasks continue
}
});
var elapsed = DateTime.UtcNow - startTime;
_logger.LogInformation(
"Sync cycle completed: {Completed} succeeded, {Failed} failed, elapsed={Elapsed:F1}s",
completedCount,
failedCount,
elapsed.TotalSeconds);
_metrics.RecordCycleCompleted(completedCount, failedCount, elapsed.TotalSeconds);
}
}
@@ -0,0 +1,415 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Interfaces;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Executes a single table sync operation.
/// </summary>
public class TableSyncOperation : ITableSyncOperation
{
private readonly IServiceProvider _serviceProvider;
private readonly IDbConnectionFactory _connectionFactory;
private readonly IDataUpdateRepository _updateRepository;
private readonly IBulkMergeHelper _bulkMergeHelper;
private readonly IMergeConfigurationRegistry _configRegistry;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<TableSyncOperation> _logger;
private readonly DataSyncMetrics _metrics;
/// <summary>
/// Initializes a new instance of the <see cref="TableSyncOperation"/> class.
/// </summary>
public TableSyncOperation(
IServiceProvider serviceProvider,
IDbConnectionFactory connectionFactory,
IDataUpdateRepository updateRepository,
IBulkMergeHelper bulkMergeHelper,
IMergeConfigurationRegistry configRegistry,
IOptions<DataSyncOptions> options,
ILogger<TableSyncOperation> logger,
DataSyncMetrics metrics)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_updateRepository = updateRepository ?? throw new ArgumentNullException(nameof(updateRepository));
_bulkMergeHelper = bulkMergeHelper ?? throw new ArgumentNullException(nameof(bulkMergeHelper));
_configRegistry = configRegistry ?? throw new ArgumentNullException(nameof(configRegistry));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
/// <inheritdoc/>
public async Task ExecuteAsync(DataUpdateTask task, CancellationToken cancellationToken = default)
{
using var logScope = _logger.BeginScope(new Dictionary<string, object>
{
["TableName"] = task.TableName,
["UpdateType"] = task.UpdateType.ToString(),
["OperationId"] = task.OperationId.ToString()
});
using var activity = DataSyncActivitySource.StartSyncOperation(task.TableName, task.UpdateType.ToString());
var stopwatch = Stopwatch.StartNew();
_logger.LogInformation(
"Starting {UpdateType} sync for {Table}, MinimumDT={MinDT}",
task.UpdateType,
task.TableName,
task.MinimumDt?.ToString("o") ?? "null");
_metrics.RecordOperationStarted(task.TableName, task.UpdateType.ToString());
// Log start of data update
var updateId = await _updateRepository.StartUpdateAsync(
task.SourceSystem,
task.SourceData,
task.TableName,
task.UpdateType,
cancellationToken);
long recordCount = 0;
var success = false;
try
{
recordCount = await ExecuteSyncCoreAsync(task, cancellationToken);
success = true;
stopwatch.Stop();
_logger.LogInformation(
"Completed {UpdateType} sync for {Table}: {RecordCount:N0} records in {Elapsed:F1}s",
task.UpdateType,
task.TableName,
recordCount,
stopwatch.Elapsed.TotalSeconds);
_metrics.RecordOperationCompleted(
task.TableName,
task.UpdateType.ToString(),
recordCount,
stopwatch.Elapsed.TotalSeconds);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.SetTag("record.count", recordCount);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
stopwatch.Stop();
_logger.LogWarning(
"Sync operation cancelled for {Table} after {Elapsed:F1}s",
task.TableName,
stopwatch.Elapsed.TotalSeconds);
activity?.SetStatus(ActivityStatusCode.Error, "Operation cancelled");
_metrics.RecordOperationFailed(task.TableName, task.UpdateType.ToString());
throw;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(
ex,
"Sync operation failed for {Table} after {Elapsed:F1}s",
task.TableName,
stopwatch.Elapsed.TotalSeconds);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
_metrics.RecordOperationFailed(task.TableName, task.UpdateType.ToString());
throw;
}
finally
{
// Update DataUpdate record
await _updateRepository.CompleteUpdateAsync(
updateId,
success ? recordCount : -1,
success,
CancellationToken.None); // Use CancellationToken.None to ensure we log the result
}
}
/// <summary>
/// Core sync logic that handles mass vs incremental updates.
/// </summary>
private async Task<long> ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
// Get the fetcher for this entity type
var fetcherType = ResolveFetcherType(task.Config.FetcherTypeName);
var fetcher = _serviceProvider.GetRequiredService(fetcherType);
// Use reflection to call FetchAsync on the fetcher
var fetchMethod = fetcher.GetType().GetMethod("FetchAsync")
?? throw new InvalidOperationException($"FetchAsync method not found on {fetcher.GetType().Name}");
var asyncEnumerable = fetchMethod.Invoke(fetcher, [task.MinimumDt, cancellationToken]);
// Get the element type for typed operations
var (_, elementType) = FindGetAsyncEnumerator(asyncEnumerable!.GetType());
if (elementType == null || !elementType.IsClass)
{
throw new InvalidOperationException(
$"Fetcher element type must be a class: {asyncEnumerable.GetType().Name}");
}
// Handle mass update with truncation
if (task.UpdateType == UpdateTypes.Mass && task.ScheduleConfig.PrepurgeData)
{
return await ExecuteMassUpdateAsync(
asyncEnumerable!,
task,
elementType,
cancellationToken);
}
// Handle incremental update with merge
return await ExecuteIncrementalUpdateAsync(
asyncEnumerable!,
task,
elementType,
cancellationToken);
}
/// <summary>
/// Executes mass update using BulkMergeHelper.
/// </summary>
private Task<long> ExecuteMassUpdateAsync(
object asyncEnumerable,
DataUpdateTask task,
Type elementType,
CancellationToken cancellationToken)
{
_logger.LogDebug("Executing mass update for {Table}", task.TableName);
// Use typed helper to call MassInsertAsync with correct type parameter
var helper = typeof(TableSyncOperation)
.GetMethod(nameof(ExecuteMassUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
.MakeGenericMethod(elementType);
return (Task<long>)helper.Invoke(this, [
asyncEnumerable,
task,
cancellationToken
])!;
}
/// <summary>
/// Typed helper for mass update.
/// </summary>
private async Task<long> ExecuteMassUpdateTypedAsync<T>(
IAsyncEnumerable<T> data,
DataUpdateTask task,
CancellationToken cancellationToken) where T : class
{
var config = _configRegistry.GetConfiguration<T>();
var result = await _bulkMergeHelper.MassInsertAsync(
data,
config.TableName,
rebuildIndexes: task.ScheduleConfig.ReIndexData,
batchSize: _options.Value.BulkCopyBatchSize,
cancellationToken: cancellationToken);
// Run post processor if configured
await RunPostProcessorAsync(task, cancellationToken);
return result.TotalRowsInserted;
}
/// <summary>
/// Executes incremental update using BulkMergeHelper.MergeAsync.
/// </summary>
private Task<long> ExecuteIncrementalUpdateAsync(
object asyncEnumerable,
DataUpdateTask task,
Type elementType,
CancellationToken cancellationToken)
{
_logger.LogDebug("Executing incremental update for {Table}", task.TableName);
// Use typed helper to call MergeAsync with correct type parameter
var helper = typeof(TableSyncOperation)
.GetMethod(nameof(ExecuteIncrementalUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
.MakeGenericMethod(elementType);
return (Task<long>)helper.Invoke(this, [
asyncEnumerable,
task,
cancellationToken
])!;
}
/// <summary>
/// Typed helper for incremental update.
/// </summary>
private async Task<long> ExecuteIncrementalUpdateTypedAsync<T>(
IAsyncEnumerable<T> data,
DataUpdateTask task,
CancellationToken cancellationToken) where T : class
{
var config = _configRegistry.GetConfiguration<T>();
var result = await _bulkMergeHelper.MergeAsync(
data,
config.TableName,
config.MatchOn,
config.UpdateColumns,
config.UpdateWhen,
config.InsertColumns,
batchSize: _options.Value.BatchSize,
cancellationToken: cancellationToken);
// Run post processor if configured
await RunPostProcessorAsync(task, cancellationToken);
return result.TotalRowsProcessed;
}
/// <summary>
/// Runs the post processor if configured.
/// </summary>
private async Task RunPostProcessorAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(task.Config.PostProcessorTypeName))
{
return;
}
_logger.LogDebug("Running post processor for {Table}", task.TableName);
var postProcessorType = ResolvePostProcessorType(task.Config.PostProcessorTypeName);
var postProcessor = (IPostProcessor)_serviceProvider.GetRequiredService(postProcessorType);
await postProcessor.ProcessAsync(task.TableName, cancellationToken);
}
/// <summary>
/// Resolves the fetcher type from the type name.
/// </summary>
private static Type ResolveFetcherType(string fetcherTypeName)
{
// Look for the type in the DataSync assembly and related assemblies
var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a =>
{
try { return a.GetTypes(); }
catch { return []; }
})
.Where(t => t.Name == fetcherTypeName || t.FullName == fetcherTypeName)
.ToList();
if (candidateTypes.Count == 0)
{
throw new InvalidOperationException($"Fetcher type '{fetcherTypeName}' not found");
}
if (candidateTypes.Count > 1)
{
throw new InvalidOperationException(
$"Multiple types found matching '{fetcherTypeName}': {string.Join(", ", candidateTypes.Select(t => t.FullName))}");
}
return candidateTypes[0];
}
/// <summary>
/// Resolves the post processor type from the type name.
/// </summary>
private static Type ResolvePostProcessorType(string postProcessorTypeName)
{
var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a =>
{
try { return a.GetTypes(); }
catch { return []; }
})
.Where(t => t.Name == postProcessorTypeName || t.FullName == postProcessorTypeName)
.Where(t => typeof(IPostProcessor).IsAssignableFrom(t))
.ToList();
if (candidateTypes.Count == 0)
{
throw new InvalidOperationException($"PostProcessor type '{postProcessorTypeName}' not found");
}
return candidateTypes[0];
}
/// <summary>
/// Enumerates an async enumerable using reflection.
/// Resolves GetAsyncEnumerator from IAsyncEnumerable interface to handle explicit implementations.
/// </summary>
private static async IAsyncEnumerable<object> EnumerateAsync(
object asyncEnumerable,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Find the IAsyncEnumerable<T> interface and get element type
var (getAsyncEnumeratorMethod, elementType) = FindGetAsyncEnumerator(asyncEnumerable.GetType());
if (getAsyncEnumeratorMethod == null || elementType == null)
{
throw new InvalidOperationException(
$"Type {asyncEnumerable.GetType().Name} does not implement IAsyncEnumerable<T>");
}
// Use generic helper to avoid reflection on MoveNextAsync/Current
var helperMethod = typeof(TableSyncOperation)
.GetMethod(nameof(EnumerateAsyncGeneric), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static)!
.MakeGenericMethod(elementType);
var result = (IAsyncEnumerable<object>)helperMethod.Invoke(null, [asyncEnumerable, cancellationToken])!;
await foreach (var item in result.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item;
}
}
/// <summary>
/// Finds the GetAsyncEnumerator method from IAsyncEnumerable interface.
/// </summary>
private static (System.Reflection.MethodInfo? Method, Type? ElementType) FindGetAsyncEnumerator(Type type)
{
var iface = type.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>));
if (iface == null)
{
return (null, null);
}
var elementType = iface.GetGenericArguments()[0];
// Get the GetAsyncEnumerator method with CancellationToken parameter
var method = iface.GetMethod("GetAsyncEnumerator", [typeof(CancellationToken)])
?? iface.GetMethod("GetAsyncEnumerator", Type.EmptyTypes);
return (method, elementType);
}
/// <summary>
/// Generic helper to enumerate IAsyncEnumerable without reflection on enumerator methods.
/// </summary>
private static async IAsyncEnumerable<object> EnumerateAsyncGeneric<T>(
IAsyncEnumerable<T> source,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item!;
}
}
}
@@ -0,0 +1,53 @@
using System.Diagnostics;
namespace JdeScoping.DataSync.Telemetry;
/// <summary>
/// Activity source for distributed tracing of data sync operations.
/// </summary>
public static class DataSyncActivitySource
{
/// <summary>
/// The activity source for data sync operations.
/// </summary>
public static readonly ActivitySource Source = new("JdeScoping.DataSync", "1.0.0");
/// <summary>
/// Starts an activity for a sync operation.
/// </summary>
/// <param name="tableName">The table being synced.</param>
/// <param name="updateType">The type of update (Hourly, Daily, Mass).</param>
/// <returns>The started activity, or null if no listeners.</returns>
public static Activity? StartSyncOperation(string tableName, string updateType)
{
return Source.StartActivity("SyncTable")?
.SetTag("table.name", tableName)
.SetTag("update.type", updateType);
}
/// <summary>
/// Starts an activity for a batch processing operation.
/// </summary>
/// <param name="tableName">The table being synced.</param>
/// <param name="batchNumber">The batch number.</param>
/// <param name="recordCount">The number of records in the batch.</param>
/// <returns>The started activity, or null if no listeners.</returns>
public static Activity? StartBatchOperation(string tableName, int batchNumber, int recordCount)
{
return Source.StartActivity("ProcessBatch")?
.SetTag("table.name", tableName)
.SetTag("batch.number", batchNumber)
.SetTag("record.count", recordCount);
}
/// <summary>
/// Starts an activity for a MERGE operation.
/// </summary>
/// <param name="tableName">The destination table.</param>
/// <returns>The started activity, or null if no listeners.</returns>
public static Activity? StartMergeOperation(string tableName)
{
return Source.StartActivity("ExecuteMerge")?
.SetTag("table.name", tableName);
}
}
@@ -0,0 +1,114 @@
using System.Diagnostics.Metrics;
namespace JdeScoping.DataSync.Telemetry;
/// <summary>
/// Metrics for data sync operations.
/// </summary>
public class DataSyncMetrics
{
private readonly Counter<long> _operationsStarted;
private readonly Counter<long> _operationsCompleted;
private readonly Counter<long> _operationsFailed;
private readonly Histogram<double> _operationDuration;
private readonly Histogram<long> _recordsProcessed;
private readonly Counter<long> _cycleErrors;
private readonly Counter<long> _cyclesCompleted;
/// <summary>
/// Initializes a new instance of the <see cref="DataSyncMetrics"/> class.
/// </summary>
public DataSyncMetrics(IMeterFactory meterFactory)
{
var meter = meterFactory.Create("JdeScoping.DataSync");
_operationsStarted = meter.CreateCounter<long>(
"datasync.operations.started",
unit: "{operation}",
description: "Number of sync operations started");
_operationsCompleted = meter.CreateCounter<long>(
"datasync.operations.completed",
unit: "{operation}",
description: "Number of sync operations completed successfully");
_operationsFailed = meter.CreateCounter<long>(
"datasync.operations.failed",
unit: "{operation}",
description: "Number of sync operations that failed");
_operationDuration = meter.CreateHistogram<double>(
"datasync.operation.duration",
unit: "s",
description: "Duration of sync operations in seconds");
_recordsProcessed = meter.CreateHistogram<long>(
"datasync.records.processed",
unit: "{record}",
description: "Number of records processed per operation");
_cycleErrors = meter.CreateCounter<long>(
"datasync.cycle.errors",
unit: "{error}",
description: "Number of errors in sync cycles");
_cyclesCompleted = meter.CreateCounter<long>(
"datasync.cycles.completed",
unit: "{cycle}",
description: "Number of sync cycles completed");
}
/// <summary>
/// Records that a sync operation has started.
/// </summary>
public void RecordOperationStarted(string tableName, string updateType)
{
_operationsStarted.Add(1,
new KeyValuePair<string, object?>("table", tableName),
new KeyValuePair<string, object?>("type", updateType));
}
/// <summary>
/// Records that a sync operation completed successfully.
/// </summary>
public void RecordOperationCompleted(string tableName, string updateType, long recordCount, double durationSeconds)
{
var tags = new KeyValuePair<string, object?>[]
{
new("table", tableName),
new("type", updateType)
};
_operationsCompleted.Add(1, tags);
_operationDuration.Record(durationSeconds, tags);
_recordsProcessed.Record(recordCount, tags);
}
/// <summary>
/// Records that a sync operation failed.
/// </summary>
public void RecordOperationFailed(string tableName, string updateType)
{
_operationsFailed.Add(1,
new KeyValuePair<string, object?>("table", tableName),
new KeyValuePair<string, object?>("type", updateType));
}
/// <summary>
/// Records an error in the sync cycle.
/// </summary>
public void RecordCycleError()
{
_cycleErrors.Add(1);
}
/// <summary>
/// Records completion of a sync cycle.
/// </summary>
public void RecordCycleCompleted(int successCount, int failedCount, double durationSeconds)
{
_cyclesCompleted.Add(1,
new KeyValuePair<string, object?>("success_count", successCount),
new KeyValuePair<string, object?>("failed_count", failedCount));
}
}