# Data Sync Service Design ## Overview This document describes the architecture and implementation patterns for the data synchronization background service. ## Component Architecture ``` JdeScoping.DataSync/ ├── DataSyncService.cs # BackgroundService implementation ├── IDataFetcher.cs # Generic fetcher interface ├── IPostProcessor.cs # Post-processing interface ├── DataSyncOptions.cs # Configuration options ├── DataSourceConfig.cs # Per-table configuration ├── ScheduleChecker.cs # Schedule evaluation logic ├── SyncOrchestrator.cs # Coordinates parallel sync operations ├── TableSyncOperation.cs # Single table sync execution ├── StagingTableManager.cs # Temp table creation and MERGE ├── DataSyncHealthCheck.cs # IHealthCheck implementation ├── DataSyncMetrics.cs # Metrics and telemetry ├── ServiceCollectionExtensions.cs # AddDataSync registration └── Fetchers/ # IDataFetcher implementations ├── JdeWorkOrderFetcher.cs ├── JdeLotUsageFetcher.cs ├── JdeItemFetcher.cs └── ... ``` ## BackgroundService Pattern ### ExecuteAsync Implementation ```csharp public class DataSyncService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly IOptions _options; private readonly ILogger _logger; private readonly DataSyncMetrics _metrics; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Startup: close any interrupted syncs from prior runs await CloseOpenUpdateEntriesAsync(stoppingToken); while (!stoppingToken.IsCancellationRequested) { try { // Create scope for this sync cycle await using var scope = _scopeFactory.CreateAsyncScope(); var orchestrator = scope.ServiceProvider .GetRequiredService(); // Check schedules and execute pending syncs await orchestrator.ExecutePendingSyncsAsync(stoppingToken); // Periodic purge of old DataUpdate records await PurgeUpdateEntriesAsync(scope, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // Graceful shutdown break; } catch (Exception ex) { _logger.LogError(ex, "Error in sync cycle"); } // Wait before next check await Task.Delay(_options.Value.CheckInterval, stoppingToken); } } } ``` ### Graceful Shutdown - `CancellationToken` propagates to all child operations - `Parallel.ForEachAsync` respects cancellation token - In-progress operations complete current batch or cancel gracefully - Incomplete syncs marked as failed with `WasSuccessful = false` ## IDataFetcher Interface ### Interface Definition ```csharp public interface IDataFetcher where TEntity : class { /// /// Fetches entities from source system as an async stream. /// /// For incremental fetches, only return records modified after this time. Null for full fetch. /// Cancellation token for graceful shutdown. /// Async enumerable of entities, streamed from source. IAsyncEnumerable FetchAsync( DateTime? minimumDT, CancellationToken cancellationToken = default); } ``` ### Fetcher Resolution Fetchers are registered in DI by convention: ```csharp services.AddScoped, JdeWorkOrderFetcher>(); services.AddScoped, JdeLotUsageFetcher>(); services.AddScoped, JdeItemFetcher>(); // ... etc ``` Configuration references fetcher type name: ```json { "DataSync": { "DataSources": [ { "TableName": "WorkOrder_Curr", "SourceSystem": "JDE", "FetcherTypeName": "JdeWorkOrderFetcher", "IsEnabled": true, "MassConfig": { "Enabled": true, "IntervalMinutes": 10080, "PrepurgeData": true }, "DailyConfig": { "Enabled": true, "IntervalMinutes": 1440 }, "HourlyConfig": { "Enabled": true, "IntervalMinutes": 60 } } ] } } ``` At startup, `FetcherTypeName` is validated and resolved to a registered `IDataFetcher`. ## Configuration Classes ### DataSyncOptions ```csharp public class DataSyncOptions { public const string SectionName = "DataSync"; /// Time between schedule checks (default: 1 minute) public TimeSpan CheckInterval { get; set; } = TimeSpan.FromMinutes(1); /// Maximum parallel sync operations (default: 8) public int MaxDegreeOfParallelism { get; set; } = 8; /// Records per batch for streaming (default: 1,000,000) public int BatchSize { get; set; } = 1_000_000; /// Rows per bulk copy batch (default: 10,000) public int BulkCopyBatchSize { get; set; } = 10_000; /// Multiplier for lookback window (default: 3) public int LookbackMultiplier { get; set; } = 3; /// Days to retain DataUpdate history (default: 30) public int PurgeRetentionDays { get; set; } = 30; /// Per-table data source configurations public List DataSources { get; set; } = new(); } ``` ### DataSourceConfig ```csharp public class DataSourceConfig { /// Target table name in SQL Server cache public required string TableName { get; set; } /// Source system: "JDE" or "CMS" public required string SourceSystem { get; set; } /// Name of IDataFetcher implementation type public required string FetcherTypeName { get; set; } /// Optional IPostProcessor implementation type name public string? PostProcessorTypeName { get; set; } /// Whether this data source is enabled for sync public bool IsEnabled { get; set; } = true; /// Mass sync schedule configuration public ScheduleConfig MassConfig { get; set; } = new(); /// Daily incremental sync configuration public ScheduleConfig DailyConfig { get; set; } = new(); /// Hourly incremental sync configuration public ScheduleConfig HourlyConfig { get; set; } = new(); } public class ScheduleConfig { public bool Enabled { get; set; } = true; public int IntervalMinutes { get; set; } public bool PrepurgeData { get; set; } = false; public bool ReIndexData { get; set; } = false; } ``` ## Parallel Sync Execution ### Parallel.ForEachAsync Pattern ```csharp public class SyncOrchestrator : ISyncOrchestrator { public async Task ExecutePendingSyncsAsync(CancellationToken cancellationToken) { var pendingTasks = await _scheduleChecker.GetPendingTasksAsync(cancellationToken); if (pendingTasks.Count == 0) return; var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = _options.Value.MaxDegreeOfParallelism, CancellationToken = cancellationToken }; await Parallel.ForEachAsync(pendingTasks, parallelOptions, async (task, ct) => { // Each task gets its own scope await using var scope = _scopeFactory.CreateAsyncScope(); var operation = scope.ServiceProvider .GetRequiredService(); await operation.ExecuteAsync(task, ct); }); } } ``` ### Isolation Requirements - Each parallel sync operation creates its own `IServiceScope` - Each operation uses its own SQL connection from the scoped `DbContext` or connection factory - Staging tables use unique suffixes: `#Staging{TableName}_{OperationId}` - No shared mutable state between parallel operations ## Staging Table Management ### Naming Convention ``` #Staging{TableName}_{OperationId} - Bulk copy destination #{TableName}_{OperationId} - Deduplicated temp table for MERGE ``` Where `OperationId` is a GUID or sequential ID unique to each sync operation. ### MERGE Operation Flow 1. **Create staging table** matching destination schema with unique suffix 2. **Bulk copy** source data to staging table (batched at 10,000 rows) 3. **Deduplicate** into temp table using `ROW_NUMBER() OVER (PARTITION BY PK ORDER BY LastUpdateDT DESC)` 4. **MERGE** from temp table to destination: - INSERT new records (not matched by primary key) - UPDATE existing records WHERE `source.LastUpdateDT > target.LastUpdateDT` 5. **Cleanup** staging and temp tables ### Mass Update with Truncation For mass updates with `PrepurgeData = true`: 1. **Disable non-PK indexes** on destination table 2. **TRUNCATE** destination table 3. **Bulk copy** directly to destination (no staging needed) 4. **Rebuild indexes** if `ReIndexData = true` 5. **Update statistics** ### Batching Large Datasets When streaming more than 1,000,000 records: ```csharp int batchNumber = 0; var batch = new List(_options.BatchSize); await foreach (var entity in fetcher.FetchAsync(minimumDT, ct)) { batch.Add(entity); if (batch.Count >= _options.BatchSize) { await ProcessBatchAsync(batch, operationId, batchNumber++, ct); batch.Clear(); } } // Process remaining records if (batch.Count > 0) { await ProcessBatchAsync(batch, operationId, batchNumber, ct); } ``` ## Update Logging ### DataUpdate Record Lifecycle ``` Start: NumberRecords = -2 (in-progress marker) | v Success: NumberRecords = actual count, WasSuccessful = true, EndDT = now OR Failure: NumberRecords = -1, WasSuccessful = false, EndDT = now ``` ### Logging with Scope ```csharp public async Task ExecuteAsync(DataUpdateTask task, CancellationToken ct) { using var logScope = _logger.BeginScope(new Dictionary { ["TableName"] = task.TableName, ["UpdateType"] = task.UpdateType, ["OperationId"] = task.OperationId }); var updateId = await _repository.StartUpdateAsync(task, ct); try { var recordCount = await ExecuteSyncAsync(task, ct); await _repository.CompleteUpdateAsync(updateId, recordCount, success: true, ct); _logger.LogInformation("Sync completed: {RecordCount} records", recordCount); } catch (Exception ex) { await _repository.CompleteUpdateAsync(updateId, -1, success: false, ct); _logger.LogError(ex, "Sync failed"); throw; } } ``` ### Startup Recovery At startup, `CloseOpenUpdateEntries()` updates any records with `NumberRecords = -2`: ```sql UPDATE DataUpdate SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1 WHERE NumberRecords = -2 ``` ## Health Checks ### IHealthCheck Implementation ```csharp public class DataSyncHealthCheck : IHealthCheck { public async Task CheckHealthAsync( HealthCheckContext context, CancellationToken cancellationToken = default) { var statuses = await GetTableStatusesAsync(cancellationToken); var data = new Dictionary(); foreach (var status in statuses) { data[$"{status.TableName}_LastSync"] = status.LastSyncTime?.ToString("O") ?? "Never"; data[$"{status.TableName}_Status"] = status.IsOverdue ? "Overdue" : "Current"; } var overdueCount = statuses.Count(s => s.IsOverdue); var failedCount = statuses.Count(s => s.RecentFailures > 0); if (failedCount > 0) return HealthCheckResult.Unhealthy("Multiple recent sync failures", data: data); if (overdueCount > 0) return HealthCheckResult.Degraded($"{overdueCount} tables overdue for sync", data: data); return HealthCheckResult.Healthy("All syncs current", data: data); } } ``` ## Telemetry ### Metrics ```csharp public class DataSyncMetrics { private readonly Meter _meter; private readonly Counter _operationsStarted; private readonly Counter _operationsCompleted; private readonly Counter _operationsFailed; private readonly Histogram _operationDuration; private readonly Histogram _recordsProcessed; public DataSyncMetrics(IMeterFactory meterFactory) { _meter = meterFactory.Create("DataSync"); _operationsStarted = _meter.CreateCounter("sync.operations.started"); _operationsCompleted = _meter.CreateCounter("sync.operations.completed"); _operationsFailed = _meter.CreateCounter("sync.operations.failed"); _operationDuration = _meter.CreateHistogram("sync.duration.seconds"); _recordsProcessed = _meter.CreateHistogram("sync.records.processed"); } public void RecordOperationStarted(string tableName, string updateType) { _operationsStarted.Add(1, new KeyValuePair("table", tableName), new KeyValuePair("type", updateType)); } // ... similar for completed, failed, duration, records } ``` ### Activity Tracing ```csharp public static class DataSyncActivitySource { public static readonly ActivitySource Source = new("DataSync"); public static Activity? StartSyncOperation(string tableName, string updateType) { return Source.StartActivity("SyncTable", ActivityKind.Internal)? .SetTag("table.name", tableName) .SetTag("update.type", updateType); } } ``` ## DI Registration ### AddDataSync Extension ```csharp public static class ServiceCollectionExtensions { public static IServiceCollection AddDataSync( this IServiceCollection services, IConfiguration configuration) { // Bind configuration services.Configure( configuration.GetSection(DataSyncOptions.SectionName)); // Register core services services.AddHostedService(); services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); // Register health check services.AddHealthChecks() .AddCheck("data-sync"); // Register metrics services.AddSingleton(); // Register fetchers services.AddScoped, JdeWorkOrderFetcher>(); services.AddScoped, JdeLotUsageFetcher>(); // ... etc // Validate configuration at startup services.AddOptions() .ValidateDataAnnotations() .ValidateOnStart(); return services; } } ``` ## Schedule Checking Logic ### Priority: Mass > Daily > Hourly ```csharp public async Task> GetPendingTasksAsync(CancellationToken ct) { var lastUpdates = await _repository.GetLastDataUpdatesAsync(ct); var tasks = new List(); foreach (var config in _options.Value.DataSources.Where(c => c.IsEnabled)) { var lastSync = lastUpdates.GetValueOrDefault(config.TableName); var now = DateTime.UtcNow; // Check Mass first (highest priority) if (config.MassConfig.Enabled && NeedsMassSync(config, lastSync, now)) { tasks.Add(CreateMassTask(config)); continue; // Skip daily/hourly checks } // Check Daily if (config.DailyConfig.Enabled && NeedsDailySync(config, lastSync, now)) { tasks.Add(CreateDailyTask(config, lastSync)); continue; } // Check Hourly if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastSync, now)) { tasks.Add(CreateHourlyTask(config, lastSync)); } } return tasks; } ``` ### MinimumDT Calculation For Daily updates: ``` MinimumDT = LastDailyUpdateDT - (LookbackMultiplier * DailyInterval) ``` For Hourly updates (uses Daily timestamp, not Hourly): ``` MinimumDT = LastDailyUpdateDT - (LookbackMultiplier * DailyInterval) ``` ## File Structure ``` NEW/src/ ├── JdeScoping.DataSync/ │ ├── JdeScoping.DataSync.csproj │ ├── DataSyncService.cs │ ├── Configuration/ │ │ ├── DataSyncOptions.cs │ │ └── DataSourceConfig.cs │ ├── Contracts/ │ │ ├── IDataFetcher.cs │ │ ├── IPostProcessor.cs │ │ ├── ISyncOrchestrator.cs │ │ ├── IScheduleChecker.cs │ │ ├── ITableSyncOperation.cs │ │ └── IStagingTableManager.cs │ ├── Services/ │ │ ├── SyncOrchestrator.cs │ │ ├── ScheduleChecker.cs │ │ ├── TableSyncOperation.cs │ │ └── StagingTableManager.cs │ ├── Fetchers/ │ │ ├── Jde/ │ │ │ ├── JdeWorkOrderFetcher.cs │ │ │ ├── JdeLotUsageFetcher.cs │ │ │ └── ... │ │ └── Cms/ │ │ └── CmsMisDataFetcher.cs │ ├── HealthChecks/ │ │ └── DataSyncHealthCheck.cs │ ├── Telemetry/ │ │ ├── DataSyncMetrics.cs │ │ └── DataSyncActivitySource.cs │ └── DependencyInjection/ │ └── ServiceCollectionExtensions.cs └── JdeScoping.Host/ └── Program.cs (add: builder.Services.AddDataSync(configuration)) ```