using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace JdeScoping.DataSync; /// /// Background service that orchestrates data synchronization from JDE/CMS to SQL Server cache. /// public class DataSyncService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly IOptions _options; private readonly ILogger _logger; private readonly DataSyncMetrics _metrics; private DateTime _lastPurgeCheck = DateTime.MinValue; private readonly TimeSpan _purgeCheckInterval = TimeSpan.FromHours(24); /// /// Initializes a new instance of the class. /// public DataSyncService( IServiceScopeFactory scopeFactory, IOptions options, ILogger logger, DataSyncMetrics metrics) { _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); } /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!_options.Value.Enabled) { _logger.LogInformation("DataSyncService is disabled via configuration"); return; } _logger.LogInformation( "DataSyncService starting with CheckInterval={CheckInterval}, MaxDegreeOfParallelism={MaxDegreeOfParallelism}", _options.Value.CheckInterval, _options.Value.MaxDegreeOfParallelism); // Startup: close any interrupted syncs from prior runs await CloseOpenUpdateEntriesAsync(stoppingToken); while (!stoppingToken.IsCancellationRequested) { try { // Create scope for this sync cycle await using var scope = _scopeFactory.CreateAsyncScope(); var orchestrator = scope.ServiceProvider.GetRequiredService(); // Check schedules and execute pending syncs await orchestrator.ExecutePendingSyncsAsync(stoppingToken); // Periodic purge of old DataUpdate records await PurgeUpdateEntriesAsync(scope, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // Graceful shutdown _logger.LogInformation("DataSyncService stopping gracefully"); break; } catch (Exception ex) { _logger.LogError(ex, "Error in sync cycle"); _metrics.RecordCycleError(); } // Wait before next check try { await Task.Delay(_options.Value.CheckInterval, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } } _logger.LogInformation("DataSyncService stopped"); } /// /// Closes any open update entries from interrupted prior runs. /// private async Task CloseOpenUpdateEntriesAsync(CancellationToken cancellationToken) { try { await using var scope = _scopeFactory.CreateAsyncScope(); var repository = scope.ServiceProvider.GetRequiredService(); var closedCount = await repository.CloseOpenUpdateEntriesAsync(cancellationToken); if (closedCount > 0) { _logger.LogWarning( "Closed {Count} interrupted update entries from prior runs", closedCount); } else { _logger.LogDebug("No interrupted update entries found"); } } catch (Exception ex) { _logger.LogError(ex, "Failed to close open update entries at startup"); // Continue starting - this is not fatal } } /// /// Purges old DataUpdate records periodically. /// private async Task PurgeUpdateEntriesAsync(AsyncServiceScope scope, CancellationToken cancellationToken) { if (DateTime.UtcNow - _lastPurgeCheck < _purgeCheckInterval) { return; } _lastPurgeCheck = DateTime.UtcNow; try { var repository = scope.ServiceProvider.GetRequiredService(); var purgedCount = await repository.PurgeOldEntriesAsync( _options.Value.PurgeRetentionDays, cancellationToken); if (purgedCount > 0) { _logger.LogInformation( "Purged {Count} DataUpdate records older than {Days} days", purgedCount, _options.Value.PurgeRetentionDays); } } catch (Exception ex) { _logger.LogError(ex, "Failed to purge old update entries"); // Continue - this is not fatal } } }