feat: implement WorkProcessor and search execution services

- SearchRepository: Search table operations with Dapper
- SearchExecutionService: Search pipeline with proper cancellation handling
- WorkProcessor: Unified BackgroundService for syncs and searches
- SearchNotificationService: SignalR notifications in Api layer

All 45 new tests pass. Proper shutdown vs timeout distinction
prevents marking searches as error on host shutdown.
This commit is contained in:
Joseph Doherty
2026-01-07 06:18:35 -05:00
parent ca4cf9d3ec
commit 91b516e197
7 changed files with 1906 additions and 0 deletions
@@ -0,0 +1,131 @@
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.Core.Models.SearchResults;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Service that orchestrates the complete search execution pipeline.
/// </summary>
public class SearchExecutionService : ISearchExecutionService
{
private readonly ISearchRepository _searchRepository;
private readonly ISearchProcessor _searchProcessor;
private readonly IExcelExportService _excelExportService;
private readonly ISearchNotificationService _notificationService;
private readonly WorkProcessorOptions _options;
private readonly ILogger<SearchExecutionService> _logger;
public SearchExecutionService(
ISearchRepository searchRepository,
ISearchProcessor searchProcessor,
IExcelExportService excelExportService,
ISearchNotificationService notificationService,
IOptions<WorkProcessorOptions> options,
ILogger<SearchExecutionService> logger)
{
_searchRepository = searchRepository ?? throw new ArgumentNullException(nameof(searchRepository));
_searchProcessor = searchProcessor ?? throw new ArgumentNullException(nameof(searchProcessor));
_excelExportService = excelExportService ?? throw new ArgumentNullException(nameof(excelExportService));
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default)
{
using var timeoutCts = new CancellationTokenSource(_options.SearchTimeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token);
try
{
// Mark as Running
await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token);
search.Status = SearchStatus.Running;
search.StartDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, linkedCts.Token);
// Execute search query
var model = new SearchModel { Id = search.Id };
await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token);
// Generate Excel
var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token);
// Complete with success
await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token);
search.Status = SearchStatus.Ended;
search.EndDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, linkedCts.Token);
_logger.LogInformation(
"Search {SearchId} completed successfully with {ResultCount} results",
search.Id, model.Results.Count);
}
catch (OperationCanceledException)
{
// Distinguish between host shutdown and timeout using the CancellationTokenSource states
// - Host shutdown: original ct is cancelled, but timeout hasn't fired
// - Timeout: timeoutCts is cancelled (regardless of ct state)
if (ct.IsCancellationRequested && !timeoutCts.IsCancellationRequested)
{
// Host shutdown - don't mark as error, let startup cleanup handle requeue
_logger.LogInformation(
"Search {SearchId} interrupted by shutdown, will be requeued on restart",
search.Id);
}
else if (timeoutCts.IsCancellationRequested)
{
// Timeout - mark as error
_logger.LogWarning(
"Search {SearchId} timed out after {Timeout}",
search.Id, _options.SearchTimeout);
await CompleteWithErrorAsync(search);
}
else
{
// Unknown cancellation source - treat as error for safety
_logger.LogWarning("Search {SearchId} cancelled from unknown source", search.Id);
await CompleteWithErrorAsync(search);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Search {SearchId} failed", search.Id);
await CompleteWithErrorAsync(search);
}
}
private async Task CompleteWithErrorAsync(Search search)
{
try
{
await _searchRepository.CompleteSearchAsync(search.Id, false, null, CancellationToken.None);
search.Status = SearchStatus.Error;
search.EndDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id);
}
}
private async Task NotifySearchUpdateSafeAsync(Search search, CancellationToken ct)
{
try
{
await _notificationService.NotifySearchUpdateAsync(search, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id);
}
}
}
@@ -0,0 +1,136 @@
using Dapper;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Repository for Search table operations.
/// </summary>
public class SearchRepository : ISearchRepository
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<SearchRepository> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="SearchRepository"/> class.
/// </summary>
/// <param name="connectionFactory">The database connection factory.</param>
/// <param name="logger">The logger instance.</param>
public SearchRepository(
IDbConnectionFactory connectionFactory,
ILogger<SearchRepository> logger)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task<Search?> GetNextQueuedSearchAsync(CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
SELECT TOP 1
Id, UserName, Name, Status, SubmitDT as SubmitDt,
StartDT as StartDt, EndDT as EndDt, CriteriaJSON as CriteriaJson
FROM dbo.Search
WHERE Status = @Status
ORDER BY SubmitDT ASC
""";
var search = await connection.QueryFirstOrDefaultAsync<Search>(
sql,
new { Status = (int)SearchStatus.Queued },
commandTimeout: 30);
if (search != null)
{
_logger.LogDebug("Found queued search {SearchId}", search.Id);
}
return search;
}
/// <inheritdoc/>
public async Task<int> ResetPartialSearchesAsync(CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @QueuedStatus, StartDT = NULL
WHERE Status = @RunningStatus AND EndDT IS NULL
""";
var count = await connection.ExecuteAsync(
sql,
new
{
QueuedStatus = (int)SearchStatus.Queued,
RunningStatus = (int)SearchStatus.Running
},
commandTimeout: 30);
if (count > 0)
{
_logger.LogWarning("Reset {Count} partial searches to Queued status", count);
}
return count;
}
/// <inheritdoc/>
public async Task StartSearchAsync(int searchId, CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @Status, StartDT = @StartDt
WHERE Id = @SearchId
""";
await connection.ExecuteAsync(
sql,
new
{
SearchId = searchId,
Status = (int)SearchStatus.Running,
StartDt = DateTime.UtcNow
},
commandTimeout: 30);
_logger.LogDebug("Started search {SearchId}", searchId);
}
/// <inheritdoc/>
public async Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @Status, EndDT = @EndDt, Results = @Results
WHERE Id = @SearchId
""";
var status = success ? SearchStatus.Ended : SearchStatus.Error;
await connection.ExecuteAsync(
sql,
new
{
SearchId = searchId,
Status = (int)status,
EndDt = DateTime.UtcNow,
Results = results
},
commandTimeout: 30);
_logger.LogDebug("Completed search {SearchId} with status {Status}", searchId, status);
}
}
@@ -0,0 +1,223 @@
using JdeScoping.Core.Interfaces;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync;
/// <summary>
/// Unified background service that coordinates data synchronization and search processing.
/// Data freshness takes priority over search processing.
/// </summary>
public class WorkProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly WorkProcessorOptions _options;
private readonly ILogger<WorkProcessor> _logger;
private readonly DataSyncMetrics _metrics;
private DateTime _lastPurgeCheck = DateTime.MinValue;
private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24);
/// <summary>
/// Initializes a new instance of the <see cref="WorkProcessor"/> class.
/// </summary>
public WorkProcessor(
IServiceScopeFactory scopeFactory,
IOptions<WorkProcessorOptions> options,
ILogger<WorkProcessor> logger,
DataSyncMetrics metrics)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("WorkProcessor is disabled");
return;
}
_logger.LogInformation(
"WorkProcessor starting with WorkInterval={WorkInterval}",
_options.WorkInterval);
// Startup cleanup
await StartupCleanupAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
string status = "Idle";
try
{
status = await DoWorkAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("WorkProcessor stopping gracefully");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in work cycle");
_metrics.RecordCycleError();
}
finally
{
// Always notify status (best-effort)
await NotifyStatusSafeAsync(status, stoppingToken);
}
try
{
await Task.Delay(_options.WorkInterval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
await NotifyStatusSafeAsync("Stopped", CancellationToken.None);
_logger.LogInformation("WorkProcessor stopped");
}
/// <summary>
/// Performs startup cleanup by closing interrupted DataUpdate entries
/// and resetting partial searches.
/// </summary>
private async Task StartupCleanupAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Close interrupted DataUpdate entries
try
{
var dataUpdateRepo = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct);
if (closedCount > 0)
{
_logger.LogWarning("Closed {Count} interrupted data update entries", closedCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close open data update entries");
}
// Reset partial searches
try
{
var searchRepo = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var resetCount = await searchRepo.ResetPartialSearchesAsync(ct);
if (resetCount > 0)
{
_logger.LogWarning("Reset {Count} partial searches to Queued", resetCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reset partial searches");
}
}
/// <summary>
/// Performs one work cycle: data syncs have priority, then search processing.
/// </summary>
private async Task<string> DoWorkAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Priority 1: Data syncs
var scheduleChecker = scope.ServiceProvider.GetRequiredService<IScheduleChecker>();
var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct);
if (pendingTasks.Count > 0)
{
await NotifyStatusSafeAsync("Updating data cache", ct);
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
await orchestrator.ExecutePendingSyncsAsync(ct);
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
// Priority 2: Search processing (only when syncs are current)
var searchRepository = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var search = await searchRepository.GetNextQueuedSearchAsync(ct);
if (search != null)
{
await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct);
var executionService = scope.ServiceProvider.GetRequiredService<ISearchExecutionService>();
await executionService.ExecuteSearchAsync(search, ct);
}
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
/// <summary>
/// Purges old DataUpdate entries periodically (every 24 hours).
/// </summary>
private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct)
{
if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval)
{
return;
}
_lastPurgeCheck = DateTime.UtcNow;
try
{
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var purgedCount = await repository.PurgeOldEntriesAsync(
_options.PurgeRetentionDays, ct);
if (purgedCount > 0)
{
_logger.LogInformation(
"Purged {Count} DataUpdate records older than {Days} days",
purgedCount, _options.PurgeRetentionDays);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to purge old data update entries");
}
}
/// <summary>
/// Sends status notification, catching and logging any exceptions.
/// </summary>
private async Task NotifyStatusSafeAsync(string status, CancellationToken ct)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var notificationService = scope.ServiceProvider.GetRequiredService<ISearchNotificationService>();
await notificationService.NotifyStatusAsync(status, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send status notification");
// Best-effort - don't throw
}
}
}