diff --git a/NEW/src/JdeScoping.Api/Services/SearchNotificationService.cs b/NEW/src/JdeScoping.Api/Services/SearchNotificationService.cs new file mode 100644 index 0000000..4c63e81 --- /dev/null +++ b/NEW/src/JdeScoping.Api/Services/SearchNotificationService.cs @@ -0,0 +1,69 @@ +using JdeScoping.Api.Hubs; +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Infrastructure; +using JdeScoping.Core.Models.Search; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.Api.Services; + +/// +/// SignalR-based implementation of search notification service. +/// Sends real-time updates to connected clients via StatusHub. +/// +public class SearchNotificationService : ISearchNotificationService +{ + private readonly IHubContext _hubContext; + private readonly ILogger _logger; + + /// + /// Initializes a new instance of SearchNotificationService. + /// + /// SignalR hub context for StatusHub. + /// Logger instance. + public SearchNotificationService( + IHubContext hubContext, + ILogger logger) + { + _hubContext = hubContext; + _logger = logger; + } + + /// + public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default) + { + try + { + var update = new SearchUpdate(search); + await _hubContext.Clients.All.SendAsync("searchUpdate", update, ct); + _logger.LogDebug( + "Search update notification sent: Id={SearchId}, Status={Status}", + search.Id, + search.Status); + } + catch (Exception ex) + { + // Best-effort notification - log but don't throw + _logger.LogWarning( + ex, + "Failed to send search update notification for SearchId={SearchId}", + search.Id); + } + } + + /// + public async Task NotifyStatusAsync(string status, CancellationToken ct = default) + { + try + { + var update = new StatusUpdate(status); + await _hubContext.Clients.All.SendAsync("statusUpdate", update, ct); + _logger.LogDebug("Status notification sent: {Status}", status); + } + catch (Exception ex) + { + // Best-effort notification - log but don't throw + _logger.LogWarning(ex, "Failed to send status notification: {Status}", status); + } + } +} diff --git a/NEW/src/JdeScoping.DataSync/Services/SearchExecutionService.cs b/NEW/src/JdeScoping.DataSync/Services/SearchExecutionService.cs new file mode 100644 index 0000000..146bca2 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Services/SearchExecutionService.cs @@ -0,0 +1,131 @@ +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.Core.Models.SearchResults; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JdeScoping.DataSync.Services; + +/// +/// Service that orchestrates the complete search execution pipeline. +/// +public class SearchExecutionService : ISearchExecutionService +{ + private readonly ISearchRepository _searchRepository; + private readonly ISearchProcessor _searchProcessor; + private readonly IExcelExportService _excelExportService; + private readonly ISearchNotificationService _notificationService; + private readonly WorkProcessorOptions _options; + private readonly ILogger _logger; + + public SearchExecutionService( + ISearchRepository searchRepository, + ISearchProcessor searchProcessor, + IExcelExportService excelExportService, + ISearchNotificationService notificationService, + IOptions options, + ILogger logger) + { + _searchRepository = searchRepository ?? throw new ArgumentNullException(nameof(searchRepository)); + _searchProcessor = searchProcessor ?? throw new ArgumentNullException(nameof(searchProcessor)); + _excelExportService = excelExportService ?? throw new ArgumentNullException(nameof(excelExportService)); + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default) + { + using var timeoutCts = new CancellationTokenSource(_options.SearchTimeout); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token); + + try + { + // Mark as Running + await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token); + search.Status = SearchStatus.Running; + search.StartDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, linkedCts.Token); + + // Execute search query + var model = new SearchModel { Id = search.Id }; + await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token); + + // Generate Excel + var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token); + + // Complete with success + await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token); + search.Status = SearchStatus.Ended; + search.EndDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, linkedCts.Token); + + _logger.LogInformation( + "Search {SearchId} completed successfully with {ResultCount} results", + search.Id, model.Results.Count); + } + catch (OperationCanceledException) + { + // Distinguish between host shutdown and timeout using the CancellationTokenSource states + // - Host shutdown: original ct is cancelled, but timeout hasn't fired + // - Timeout: timeoutCts is cancelled (regardless of ct state) + if (ct.IsCancellationRequested && !timeoutCts.IsCancellationRequested) + { + // Host shutdown - don't mark as error, let startup cleanup handle requeue + _logger.LogInformation( + "Search {SearchId} interrupted by shutdown, will be requeued on restart", + search.Id); + } + else if (timeoutCts.IsCancellationRequested) + { + // Timeout - mark as error + _logger.LogWarning( + "Search {SearchId} timed out after {Timeout}", + search.Id, _options.SearchTimeout); + await CompleteWithErrorAsync(search); + } + else + { + // Unknown cancellation source - treat as error for safety + _logger.LogWarning("Search {SearchId} cancelled from unknown source", search.Id); + await CompleteWithErrorAsync(search); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Search {SearchId} failed", search.Id); + await CompleteWithErrorAsync(search); + } + } + + private async Task CompleteWithErrorAsync(Search search) + { + try + { + await _searchRepository.CompleteSearchAsync(search.Id, false, null, CancellationToken.None); + search.Status = SearchStatus.Error; + search.EndDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id); + } + } + + private async Task NotifySearchUpdateSafeAsync(Search search, CancellationToken ct) + { + try + { + await _notificationService.NotifySearchUpdateAsync(search, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id); + } + } +} diff --git a/NEW/src/JdeScoping.DataSync/Services/SearchRepository.cs b/NEW/src/JdeScoping.DataSync/Services/SearchRepository.cs new file mode 100644 index 0000000..45cbd61 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Services/SearchRepository.cs @@ -0,0 +1,136 @@ +using Dapper; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Contracts; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.DataSync.Services; + +/// +/// Repository for Search table operations. +/// +public class SearchRepository : ISearchRepository +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// The database connection factory. + /// The logger instance. + public SearchRepository( + IDbConnectionFactory connectionFactory, + ILogger logger) + { + _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task GetNextQueuedSearchAsync(CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + SELECT TOP 1 + Id, UserName, Name, Status, SubmitDT as SubmitDt, + StartDT as StartDt, EndDT as EndDt, CriteriaJSON as CriteriaJson + FROM dbo.Search + WHERE Status = @Status + ORDER BY SubmitDT ASC + """; + + var search = await connection.QueryFirstOrDefaultAsync( + sql, + new { Status = (int)SearchStatus.Queued }, + commandTimeout: 30); + + if (search != null) + { + _logger.LogDebug("Found queued search {SearchId}", search.Id); + } + + return search; + } + + /// + public async Task ResetPartialSearchesAsync(CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @QueuedStatus, StartDT = NULL + WHERE Status = @RunningStatus AND EndDT IS NULL + """; + + var count = await connection.ExecuteAsync( + sql, + new + { + QueuedStatus = (int)SearchStatus.Queued, + RunningStatus = (int)SearchStatus.Running + }, + commandTimeout: 30); + + if (count > 0) + { + _logger.LogWarning("Reset {Count} partial searches to Queued status", count); + } + + return count; + } + + /// + public async Task StartSearchAsync(int searchId, CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @Status, StartDT = @StartDt + WHERE Id = @SearchId + """; + + await connection.ExecuteAsync( + sql, + new + { + SearchId = searchId, + Status = (int)SearchStatus.Running, + StartDt = DateTime.UtcNow + }, + commandTimeout: 30); + + _logger.LogDebug("Started search {SearchId}", searchId); + } + + /// + public async Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @Status, EndDT = @EndDt, Results = @Results + WHERE Id = @SearchId + """; + + var status = success ? SearchStatus.Ended : SearchStatus.Error; + + await connection.ExecuteAsync( + sql, + new + { + SearchId = searchId, + Status = (int)status, + EndDt = DateTime.UtcNow, + Results = results + }, + commandTimeout: 30); + + _logger.LogDebug("Completed search {SearchId} with status {Status}", searchId, status); + } +} diff --git a/NEW/src/JdeScoping.DataSync/WorkProcessor.cs b/NEW/src/JdeScoping.DataSync/WorkProcessor.cs new file mode 100644 index 0000000..1ec43d8 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/WorkProcessor.cs @@ -0,0 +1,223 @@ +using JdeScoping.Core.Interfaces; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JdeScoping.DataSync; + +/// +/// Unified background service that coordinates data synchronization and search processing. +/// Data freshness takes priority over search processing. +/// +public class WorkProcessor : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly WorkProcessorOptions _options; + private readonly ILogger _logger; + private readonly DataSyncMetrics _metrics; + + private DateTime _lastPurgeCheck = DateTime.MinValue; + private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24); + + /// + /// Initializes a new instance of the class. + /// + public WorkProcessor( + IServiceScopeFactory scopeFactory, + IOptions options, + ILogger logger, + DataSyncMetrics metrics) + { + _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); + } + + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!_options.Enabled) + { + _logger.LogInformation("WorkProcessor is disabled"); + return; + } + + _logger.LogInformation( + "WorkProcessor starting with WorkInterval={WorkInterval}", + _options.WorkInterval); + + // Startup cleanup + await StartupCleanupAsync(stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + string status = "Idle"; + try + { + status = await DoWorkAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + _logger.LogInformation("WorkProcessor stopping gracefully"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in work cycle"); + _metrics.RecordCycleError(); + } + finally + { + // Always notify status (best-effort) + await NotifyStatusSafeAsync(status, stoppingToken); + } + + try + { + await Task.Delay(_options.WorkInterval, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + } + + await NotifyStatusSafeAsync("Stopped", CancellationToken.None); + _logger.LogInformation("WorkProcessor stopped"); + } + + /// + /// Performs startup cleanup by closing interrupted DataUpdate entries + /// and resetting partial searches. + /// + private async Task StartupCleanupAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Close interrupted DataUpdate entries + try + { + var dataUpdateRepo = scope.ServiceProvider.GetRequiredService(); + var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct); + if (closedCount > 0) + { + _logger.LogWarning("Closed {Count} interrupted data update entries", closedCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to close open data update entries"); + } + + // Reset partial searches + try + { + var searchRepo = scope.ServiceProvider.GetRequiredService(); + var resetCount = await searchRepo.ResetPartialSearchesAsync(ct); + if (resetCount > 0) + { + _logger.LogWarning("Reset {Count} partial searches to Queued", resetCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reset partial searches"); + } + } + + /// + /// Performs one work cycle: data syncs have priority, then search processing. + /// + private async Task DoWorkAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Priority 1: Data syncs + var scheduleChecker = scope.ServiceProvider.GetRequiredService(); + var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct); + + if (pendingTasks.Count > 0) + { + await NotifyStatusSafeAsync("Updating data cache", ct); + + var orchestrator = scope.ServiceProvider.GetRequiredService(); + await orchestrator.ExecutePendingSyncsAsync(ct); + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + // Priority 2: Search processing (only when syncs are current) + var searchRepository = scope.ServiceProvider.GetRequiredService(); + var search = await searchRepository.GetNextQueuedSearchAsync(ct); + + if (search != null) + { + await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct); + + var executionService = scope.ServiceProvider.GetRequiredService(); + await executionService.ExecuteSearchAsync(search, ct); + } + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + /// + /// Purges old DataUpdate entries periodically (every 24 hours). + /// + private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct) + { + if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval) + { + return; + } + + _lastPurgeCheck = DateTime.UtcNow; + + try + { + var repository = scope.ServiceProvider.GetRequiredService(); + var purgedCount = await repository.PurgeOldEntriesAsync( + _options.PurgeRetentionDays, ct); + + if (purgedCount > 0) + { + _logger.LogInformation( + "Purged {Count} DataUpdate records older than {Days} days", + purgedCount, _options.PurgeRetentionDays); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to purge old data update entries"); + } + } + + /// + /// Sends status notification, catching and logging any exceptions. + /// + private async Task NotifyStatusSafeAsync(string status, CancellationToken ct) + { + try + { + await using var scope = _scopeFactory.CreateAsyncScope(); + var notificationService = scope.ServiceProvider.GetRequiredService(); + await notificationService.NotifyStatusAsync(status, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send status notification"); + // Best-effort - don't throw + } + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs new file mode 100644 index 0000000..42477a2 --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs @@ -0,0 +1,546 @@ +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.Core.Models.SearchResults; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Shouldly; + +using MsOptions = Microsoft.Extensions.Options.Options; + +namespace JdeScoping.DataSync.Tests.Services; + +/// +/// Unit tests for SearchExecutionService. +/// Tests cancellation handling, error paths, and success scenarios. +/// +public class SearchExecutionServiceTests +{ + private readonly ISearchRepository _searchRepository; + private readonly ISearchProcessor _searchProcessor; + private readonly IExcelExportService _excelExportService; + private readonly ISearchNotificationService _notificationService; + private readonly IOptions _options; + private readonly ILogger _logger; + + public SearchExecutionServiceTests() + { + _searchRepository = Substitute.For(); + _searchProcessor = Substitute.For(); + _excelExportService = Substitute.For(); + _notificationService = Substitute.For(); + _options = MsOptions.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromSeconds(30) }); + _logger = Substitute.For>(); + } + + #region Constructor Tests + + [Fact] + public void Constructor_WithNullSearchRepository_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + null!, + _searchProcessor, + _excelExportService, + _notificationService, + _options, + _logger)); + } + + [Fact] + public void Constructor_WithNullSearchProcessor_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + _searchRepository, + null!, + _excelExportService, + _notificationService, + _options, + _logger)); + } + + [Fact] + public void Constructor_WithNullExcelExportService_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + _searchRepository, + _searchProcessor, + null!, + _notificationService, + _options, + _logger)); + } + + [Fact] + public void Constructor_WithNullNotificationService_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + null!, + _options, + _logger)); + } + + [Fact] + public void Constructor_WithNullOptions_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + null!, + _logger)); + } + + [Fact] + public void Constructor_WithNullLogger_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => + new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + _options, + null!)); + } + + [Fact] + public void Constructor_WithValidDependencies_CreatesInstance() + { + // Act + var sut = CreateService(); + + // Assert + sut.ShouldNotBeNull(); + } + + #endregion + + #region ExecuteSearchAsync Success Path + + [Fact] + public async Task ExecuteSearchAsync_Success_StartsSearch() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).StartSearchAsync(1, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_ExecutesSearchProcessor() + { + // Arrange + var search = new Search { Id = 42, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 42, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchProcessor.Received(1).ExecuteSearchToModelAsync( + Arg.Is(m => m.Id == 42), + Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_GeneratesExcel() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _excelExportService.Received(1).GenerateAsync( + Arg.Any(), + Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_CompletesWithEndedStatus() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).CompleteSearchAsync(1, true, excelBytes, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_SendsNotifications() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert - Should receive 2 notifications: one for Running, one for Ended + await _notificationService.Received(2).NotifySearchUpdateAsync(search, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_UpdatesSearchStatus() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + search.Status.ShouldBe(SearchStatus.Ended); + search.EndDt.ShouldNotBeNull(); + } + + #endregion + + #region ExecuteSearchAsync Error Path + + [Fact] + public async Task ExecuteSearchAsync_ProcessorThrows_MarksAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Test error")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_ExcelGeneratorThrows_MarksAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Excel generation failed")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_ProcessorThrows_UpdatesSearchStatusToError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Test error")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + search.Status.ShouldBe(SearchStatus.Error); + search.EndDt.ShouldNotBeNull(); + } + + [Fact] + public async Task ExecuteSearchAsync_ProcessorThrows_SendsErrorNotification() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Test error")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert - Should receive 2 notifications: one for Running, one for Error + await _notificationService.Received(2).NotifySearchUpdateAsync(search, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_ErrorDuringComplete_DoesNotThrow() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Test error")); + _searchRepository.CompleteSearchAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("DB error")); + + var sut = CreateService(); + + // Act & Assert - Should not throw + await Should.NotThrowAsync(() => sut.ExecuteSearchAsync(search)); + } + + #endregion + + #region ExecuteSearchAsync Host Shutdown + + [Fact] + public async Task ExecuteSearchAsync_HostShutdown_DoesNotMarkAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + using var cts = new CancellationTokenSource(); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search, cts.Token); + + // Assert - Should NOT call CompleteSearchAsync with success=false + await _searchRepository.DidNotReceive().CompleteSearchAsync( + Arg.Any(), false, Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_HostShutdown_DoesNotUpdateStatusToError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + using var cts = new CancellationTokenSource(); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search, cts.Token); + + // Assert - Status should still be Running (not Error) + search.Status.ShouldBe(SearchStatus.Running); + } + + [Fact] + public async Task ExecuteSearchAsync_HostShutdown_DoesNotThrow() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + using var cts = new CancellationTokenSource(); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var sut = CreateService(); + + // Act & Assert - Should not throw + await Should.NotThrowAsync(() => sut.ExecuteSearchAsync(search, cts.Token)); + } + + #endregion + + #region ExecuteSearchAsync Timeout + + [Fact] + public async Task ExecuteSearchAsync_Timeout_MarksAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var shortTimeoutOptions = MsOptions.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromMilliseconds(50) }); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(async callInfo => + { + var ct = callInfo.Arg(); + await Task.Delay(TimeSpan.FromSeconds(5), ct); // Will be canceled by timeout + return new SearchModel(); + }); + + var sut = new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + shortTimeoutOptions, + _logger); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_Timeout_UpdatesStatusToError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var shortTimeoutOptions = MsOptions.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromMilliseconds(50) }); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(async callInfo => + { + var ct = callInfo.Arg(); + await Task.Delay(TimeSpan.FromSeconds(5), ct); // Will be canceled by timeout + return new SearchModel(); + }); + + var sut = new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + shortTimeoutOptions, + _logger); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + search.Status.ShouldBe(SearchStatus.Error); + } + + #endregion + + #region Notification Resilience + + [Fact] + public async Task ExecuteSearchAsync_NotificationFails_ContinuesExecution() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + _notificationService.NotifySearchUpdateAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("SignalR error")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert - Should still complete successfully despite notification failures + await _searchRepository.Received(1).CompleteSearchAsync(1, true, excelBytes, Arg.Any()); + } + + #endregion + + private SearchExecutionService CreateService() + { + return new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + _options, + _logger); + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs new file mode 100644 index 0000000..80132ce --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs @@ -0,0 +1,129 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +/// +/// Unit tests for SearchRepository. +/// Tests constructor validation and interface contract compliance. +/// Integration tests with actual database are required for full coverage. +/// +public class SearchRepositoryTests +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + public SearchRepositoryTests() + { + _connectionFactory = Substitute.For(); + _logger = NullLogger.Instance; + } + + #region Constructor Tests + + [Fact] + public void Constructor_WithNullConnectionFactory_ThrowsArgumentNullException() + { + // Act & Assert + var exception = Should.Throw(() => + new SearchRepository(null!, _logger)); + + exception.ParamName.ShouldBe("connectionFactory"); + } + + [Fact] + public void Constructor_WithNullLogger_ThrowsArgumentNullException() + { + // Act & Assert + var exception = Should.Throw(() => + new SearchRepository(_connectionFactory, null!)); + + exception.ParamName.ShouldBe("logger"); + } + + [Fact] + public void Constructor_WithValidDependencies_CreatesInstance() + { + // Act + var repository = new SearchRepository(_connectionFactory, _logger); + + // Assert + repository.ShouldNotBeNull(); + } + + #endregion + + #region Interface Contract Tests + + [Fact] + public void SearchRepository_ImplementsISearchRepository() + { + // Arrange & Act + var repository = new SearchRepository(_connectionFactory, _logger); + + // Assert + repository.ShouldBeAssignableTo(); + } + + [Fact] + public void GetNextQueuedSearchAsync_HasCorrectSignature() + { + // Arrange + var repository = new SearchRepository(_connectionFactory, _logger); + + // Act - Verify method exists with correct return type + var methodInfo = typeof(SearchRepository).GetMethod(nameof(SearchRepository.GetNextQueuedSearchAsync)); + + // Assert + methodInfo.ShouldNotBeNull(); + methodInfo.ReturnType.ShouldBe(typeof(Task)); + } + + [Fact] + public void ResetPartialSearchesAsync_HasCorrectSignature() + { + // Arrange + var repository = new SearchRepository(_connectionFactory, _logger); + + // Act - Verify method exists with correct return type + var methodInfo = typeof(SearchRepository).GetMethod(nameof(SearchRepository.ResetPartialSearchesAsync)); + + // Assert + methodInfo.ShouldNotBeNull(); + methodInfo.ReturnType.ShouldBe(typeof(Task)); + } + + [Fact] + public void StartSearchAsync_HasCorrectSignature() + { + // Arrange + var repository = new SearchRepository(_connectionFactory, _logger); + + // Act - Verify method exists with correct return type + var methodInfo = typeof(SearchRepository).GetMethod(nameof(SearchRepository.StartSearchAsync)); + + // Assert + methodInfo.ShouldNotBeNull(); + methodInfo.ReturnType.ShouldBe(typeof(Task)); + } + + [Fact] + public void CompleteSearchAsync_HasCorrectSignature() + { + // Arrange + var repository = new SearchRepository(_connectionFactory, _logger); + + // Act - Verify method exists with correct return type + var methodInfo = typeof(SearchRepository).GetMethod(nameof(SearchRepository.CompleteSearchAsync)); + + // Assert + methodInfo.ShouldNotBeNull(); + methodInfo.ReturnType.ShouldBe(typeof(Task)); + } + + #endregion +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs new file mode 100644 index 0000000..3c77bbf --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs @@ -0,0 +1,672 @@ +using System.Diagnostics.Metrics; +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Models; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Shouldly; + +using MsOptions = Microsoft.Extensions.Options.Options; + +namespace JdeScoping.DataSync.Tests; + +/// +/// Unit tests for WorkProcessor background service. +/// +public class WorkProcessorTests +{ + #region Disabled Service + + [Fact] + public async Task ExecuteAsync_WhenDisabled_StopsImmediately() + { + // Arrange + var options = MsOptions.Create(new WorkProcessorOptions { Enabled = false }); + var scopeFactory = Substitute.For(); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(50); + await sut.StopAsync(CancellationToken.None); + + // Assert - should not throw and scope factory should not be called + scopeFactory.DidNotReceive().CreateAsyncScope(); + } + + #endregion + + #region Startup Cleanup + + [Fact] + public async Task ExecuteAsync_CallsStartupCleanup_CloseOpenUpdateEntries() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + var notificationService = Substitute.For(); + + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_CallsStartupCleanup_ResetPartialSearches() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + var notificationService = Substitute.For(); + + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_WhenCloseOpenEntriesThrows_ContinuesStarting() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + dataUpdateRepo.CloseOpenUpdateEntriesAsync(Arg.Any()) + .ThrowsAsync(new Exception("Database error")); + + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act - should not throw + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert - search repo should still be called (startup continues) + await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_WhenResetPartialSearchesThrows_ContinuesRunning() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + searchRepo.ResetPartialSearchesAsync(Arg.Any()) + .ThrowsAsync(new Exception("Database error")); + + var scheduleChecker = Substitute.For(); + var callCount = 0; + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(x => + { + callCount++; + return new List(); + }); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(50) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act - should not throw + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert - service continues running after startup error + callCount.ShouldBeGreaterThan(0); + } + + #endregion + + #region Priority Processing + + [Fact] + public async Task DoWorkAsync_WhenPendingTasks_ExecutesSyncs() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var orchestrator = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService, + orchestrator: orchestrator); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await orchestrator.Received().ExecutePendingSyncsAsync(Arg.Any()); + } + + [Fact] + public async Task DoWorkAsync_WhenNoPendingTasks_ChecksForQueuedSearches() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await searchRepo.Received().GetNextQueuedSearchAsync(Arg.Any()); + } + + [Fact] + public async Task DoWorkAsync_WhenQueuedSearchExists_ExecutesSearch() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var queuedSearch = new Search { Id = 42 }; + searchRepo.GetNextQueuedSearchAsync(Arg.Any()) + .Returns(queuedSearch); + + var searchExecution = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService, + searchExecution: searchExecution); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await searchExecution.Received().ExecuteSearchAsync( + Arg.Is(s => s.Id == 42), + Arg.Any()); + } + + [Fact] + public async Task DoWorkAsync_WhenPendingTasks_DoesNotProcessSearches() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var orchestrator = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService, + orchestrator: orchestrator); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert - when syncs are pending, searches are not processed + await searchRepo.DidNotReceive().GetNextQueuedSearchAsync(Arg.Any()); + } + + #endregion + + #region Error Handling + + [Fact] + public async Task ExecuteAsync_WhenDoWorkThrows_ContinuesLoop() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var callCount = 0; + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(x => + { + callCount++; + if (callCount == 1) + { + throw new Exception("Test error"); + } + return new List(); + }); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(50) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(250); + await sut.StopAsync(CancellationToken.None); + + // Assert - should have been called multiple times despite first error + callCount.ShouldBeGreaterThan(1); + } + + #endregion + + #region Status Notifications + + [Fact] + public async Task NotifyStatusSafeAsync_WhenNotificationThrows_DoesNotCrash() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + var notificationService = Substitute.For(); + notificationService.NotifyStatusAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new Exception("SignalR error")); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act - should not throw + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert - no exception thrown, service runs + } + + #endregion + + #region Graceful Shutdown + + [Fact] + public async Task ExecuteAsync_WhenCancelled_StopsGracefully() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List()); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromSeconds(10) // Long interval + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + using var cts = new CancellationTokenSource(); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(50); + cts.Cancel(); + + var stopTask = sut.StopAsync(CancellationToken.None); + var completed = await Task.WhenAny(stopTask, Task.Delay(2000)); + + // Assert - should complete without hanging + completed.ShouldBe(stopTask); + } + + [Fact] + public async Task ExecuteAsync_WhenCancelledDuringWork_HandlesOperationCanceledException() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var orchestrator = Substitute.For(); + var cts = new CancellationTokenSource(); + + orchestrator.ExecutePendingSyncsAsync(Arg.Any()) + .Returns(async x => + { + cts.Cancel(); + await Task.Delay(100, x.Arg()); + }); + + var scheduleChecker = Substitute.For(); + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); + var notificationService = Substitute.For(); + + var scopeFactory = SetupScopeFactory( + dataUpdateRepo, + searchRepo, + scheduleChecker, + notificationService, + orchestrator: orchestrator); + + var options = MsOptions.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + var metrics = CreateMetrics(); + + var sut = new WorkProcessor( + scopeFactory, + options, + NullLogger.Instance, + metrics); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(200); + var stopTask = sut.StopAsync(CancellationToken.None); + var completed = await Task.WhenAny(stopTask, Task.Delay(2000)); + + // Assert - should complete gracefully + completed.ShouldBe(stopTask); + cts.Dispose(); + } + + #endregion + + #region Helper Methods + + private static DataSyncMetrics CreateMetrics() + { + var services = new ServiceCollection(); + services.AddMetrics(); + var provider = services.BuildServiceProvider(); + var meterFactory = provider.GetRequiredService(); + return new DataSyncMetrics(meterFactory); + } + + private static IServiceScopeFactory SetupScopeFactory( + IDataUpdateRepository dataUpdateRepo, + ISearchRepository searchRepo, + IScheduleChecker scheduleChecker, + ISearchNotificationService notificationService, + ISyncOrchestrator? orchestrator = null, + ISearchExecutionService? searchExecution = null) + { + orchestrator ??= Substitute.For(); + searchExecution ??= Substitute.For(); + + var services = new ServiceCollection(); + services.AddScoped(_ => dataUpdateRepo); + services.AddScoped(_ => searchRepo); + services.AddScoped(_ => scheduleChecker); + services.AddScoped(_ => notificationService); + services.AddScoped(_ => orchestrator); + services.AddScoped(_ => searchExecution); + + var serviceProvider = services.BuildServiceProvider(); + return serviceProvider.GetRequiredService(); + } + + private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType) + { + return new DataUpdateTask + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + UpdateType = updateType, + MinimumDt = null, + Config = new DataSourceConfig + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + IsEnabled = true, + MassConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 10080 }, + DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 }, + HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 } + } + }; + } + + #endregion +}