docs: add WorkProcessor design and implementation documentation

- WorkProcessorReport.md: Analysis of legacy work processor from OLD solution
- Design document with clean architecture and component specifications
- Implementation plan with 15 TDD tasks
This commit is contained in:
Joseph Doherty
2026-01-07 06:30:54 -05:00
parent 5ee920a399
commit 8630a5d32b
3 changed files with 2752 additions and 0 deletions
+407
View File
@@ -0,0 +1,407 @@
# Work Processor Report
This document describes the background work processor from the legacy JDE Scoping Tool (LotFinder) WorkerService. The work processor is responsible for coordinating data synchronization and search processing.
## Executive Summary
The `WorkProcessor` class is the main orchestration component that runs as a Windows service (via Topshelf). It implements a continuous polling loop that:
1. Checks if any data syncs are overdue
2. If overdue syncs exist → performs data updates in parallel
3. If all syncs are current → processes the next queued search
4. Sleeps for 5 seconds and repeats
**Key Design Principle:** Data freshness takes priority over search processing. Searches are only processed when all data caches are up to date.
---
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────────┐
│ WorkProcessor │
│ (Topshelf Service) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │
│ │ Start() │────▶│ DoWork() │────▶│ Stop() │ │
│ └──────────────┘ └───────┬───────┘ └──────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────┐ ┌─────────────────┐ │
│ │ UpdateProcessor │ │ Search │ │ SignalR Hub │ │
│ │ (Data Sync) │ │ Processor │ │ (Status Update) │ │
│ └──────────────────┘ └───────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
## Source Files
| File | Purpose |
|------|---------|
| `OLD/WorkerService/Process/WorkProcessor.cs` | Main orchestration loop |
| `OLD/WorkerService/Process/UpdateProcessor.cs` | Data sync coordination |
| `OLD/WorkerService/Process/UpdateProcessor.DataUpdateEntry.cs` | DataUpdate table logging |
| `OLD/WorkerService/Process/UpdateProcessor.TableManagement.cs` | Staging/merge SQL generation |
| `OLD/WorkerService/Process/LotFinderDBExt.cs` | Search query execution |
| `OLD/WorkerService/Process/ExcelWriter.cs` | Excel result generation |
| `OLD/WorkerService/Program.cs` | Topshelf service configuration |
---
## Main Work Loop
### Constants
| Constant | Value | Description |
|----------|-------|-------------|
| `WAIT_INTERVAL` | 5000 ms (5 seconds) | Sleep duration between work cycles |
### Loop Structure
```
Start()
└── Thread spawned
└── while(true)
├── DoWork()
└── cancel.WaitOne(WAIT_INTERVAL)
└── if signaled → break
```
The loop runs indefinitely until `Stop()` is called, which sets the `cancel` ManualResetEvent.
---
## DoWork() Algorithm
The `DoWork()` method implements the following decision flow:
```
DoWork()
├─▶ GetPendingUpdateTasks()
│ └── Returns list of overdue data syncs
├─▶ IF pending updates exist:
│ ├── Status = "Updating data cache"
│ ├── Parallel.ForEach(pending, MaxDegreeOfParallelism=8)
│ │ └── UpdateProcessor.DoUpdate(task)
│ └── RETURN (skip search processing this cycle)
├─▶ ELSE (all data syncs current):
│ │
│ ├── ResetPartialSearches()
│ │ └── Reset any searches that started but never completed
│ │
│ ├── GetNextSearch()
│ │ └── Get oldest queued search (Status=1, ORDER BY SubmitDT)
│ │
│ └── IF search found:
│ ├── Status = "Processing search #{ID}"
│ ├── StartSearch(search)
│ │ └── Status → 2 (Started), set StartDT
│ ├── PublishSearchUpdate(search) → SignalR
│ │
│ ├── Execute search query
│ ├── Generate Excel results
│ ├── Save debug file: search_{ID}.xlsx
│ │
│ ├── CompleteSearch(search, success=true)
│ │ └── Status → 3 (Ended), store Results
│ └── PublishSearchUpdate(search) → SignalR
│ ON EXCEPTION:
│ ├── Log error
│ ├── CompleteSearch(search, success=false)
│ │ └── Status → 4 (Error)
│ └── PublishSearchUpdate(search) → SignalR
└── Status = "Idle"
```
---
## Data Sync Scheduling
### How "Overdue" Is Determined
The `GetPendingUpdateTasks()` method checks each configured data source against the `LastDataUpdates` view:
```sql
-- LastDataUpdates view returns the most recent successful update per table/type
SELECT TableName,
COALESCE([3], '1970-01-01') AS MassUpdateDT, -- UpdateType 3 = Mass
COALESCE([2], '1970-01-01') AS DailyUpdateDT, -- UpdateType 2 = Daily
COALESCE([1], '1970-01-01') AS HourlyUpdateDT -- UpdateType 1 = Hourly
FROM (pivot query on DataUpdate table)
```
### Priority Order (Exclusive)
Only ONE update type is selected per data source per cycle:
| Priority | Condition | Update Type |
|----------|-----------|-------------|
| 1 | No record exists OR `Now > MassUpdateDT + MassInterval` | **Mass** (full reload) |
| 2 | `Now > DailyUpdateDT + DailyInterval` | **Daily** (incremental) |
| 3 | `Now > HourlyUpdateDT + HourlyInterval` | **Hourly** (incremental) |
### Lookback Multiplier
For incremental updates (Daily/Hourly), the query uses a **lookback of 3× the interval**:
```csharp
// Daily update example
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);
// If DailyInterval = 1440 (24 hours), lookback = 3 days
// Hourly update example (note: uses Daily values - see bug below)
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);
```
**Purpose:** The 3× multiplier provides overlap to catch any records that may have been missed due to timing edge cases or delays.
> **⚠️ Legacy Bug:** In `GetPendingUpdateTasks()`, the Hourly update incorrectly uses `DailyUpdateDT` and `DailyUpdateConfig.Interval` instead of the hourly equivalents. This means hourly updates use a 3-day lookback instead of 3 hours. The separate `DoUpdate(tableName, updateType)` overload correctly uses hourly values, but this path is not called from the main work loop.
### Parallel Execution
Data updates run in parallel with controlled concurrency:
```csharp
Parallel.ForEach(pending,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
task => UpdateProcessor.DoUpdate(task));
```
---
## Data Update Process
Each `DoUpdate(task)` execution:
1. **Log Start**: Insert `DataUpdate` record with `NumberRecords = -2` (in-progress marker)
2. **Prepurge** (if configured): `TRUNCATE TABLE {destination}`
3. **Fetch Data**: Call the configured data fetch function (JDE/CMS query)
4. **Batch Processing** (1M records per batch):
- Create staging table (`#Staging{table}`) - local temp table
- Create index on staging table (PK columns + LastUpdateDT/ReleaseDate)
- Disable non-PK indexes on staging table
- Bulk copy data to staging (batch size: 10,000)
- Rebuild indexes on staging table
- Create temp table (`#{table}`) with deduplication via `ROW_NUMBER() OVER (PARTITION BY {PK} ORDER BY LastUpdateDT)`
- MERGE to destination table (only updates when `TARGET.LastUpdateDT < SOURCE.LastUpdateDT`)
5. **Post-Processing**: Run configured action (e.g., `PostProcessMisData`)
6. **Reindex** (if configured): Rebuild all non-PK indexes on destination
7. **Log End**: Update `DataUpdate` record with `WasSuccessful`, `NumberRecords`
### In-Progress Detection
Records with `NumberRecords = -2` indicate updates that started but never completed (crash/restart).
> **Note:** The codebase contains a `CloseOpenUpdateEntries()` method to clean up these records, but it is **never called** in the current implementation. This means crashed updates remain with `NumberRecords = -2` indefinitely. The cleanup SQL exists but is unused:
```sql
-- Defined but never invoked
UPDATE dbo.DataUpdate
SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1
WHERE NumberRecords = -2
```
---
## Search Processing
### Search Status Values
| Value | Enum | Description |
|-------|------|-------------|
| 0 | `New` | Created but not submitted |
| 1 | `Submitted` | Queued for processing |
| 2 | `Started` | Currently being processed |
| 3 | `Ended` | Completed successfully |
| 4 | `Error` | Failed with error |
### Partial Search Recovery
Before processing new searches, the system resets any "orphaned" searches:
```sql
-- ResetPartialSearches stored procedure
UPDATE dbo.Search
SET Status = 1, StartDT = NULL
WHERE StartDT IS NOT NULL AND EndDT IS NULL
```
This handles cases where the service crashed mid-search.
### Search Execution Flow
1. **GetNextSearch**: Query for oldest `Status = 1` search
2. **StartSearch**: Update to `Status = 2`, set `StartDT`
3. **Execute Query**: Run the generated SQL against local cache
4. **Generate Excel**: Create Excel file using EPPlus
5. **CompleteSearch**: Update `Status = 3` (or 4 on error), store `Results` blob
---
## SignalR Integration
### Status Updates
The `Status` property publishes changes to the SignalR hub:
```csharp
public string Status
{
set
{
if (!string.Equals(_status, value))
{
_status = value;
logger.Info("Status: {0}", _status);
PublishStatus(_status); // → SignalR
}
}
}
```
### Hub Methods
| Method | Payload | When Called |
|--------|---------|-------------|
| `SetStatus` | `{ Message, Timestamp }` | On status property change |
| `PublishSearchUpdate` | `SearchUpdate(search)` | After StartSearch, CompleteSearch |
### Connection Pattern
Each publish creates a new `HubConnection`:
```csharp
string hubHost = ConfigurationManager.AppSettings["HubHost"];
using (HubConnection hubConnection = new HubConnection(hubHost))
{
IHubProxy hubProxy = hubConnection.CreateHubProxy("StatusHub");
await hubConnection.Start();
await hubProxy.Invoke("SetStatus", update);
}
```
**Note:** This is an `async void` fire-and-forget pattern. Failures are logged but don't interrupt processing.
---
## Timing Summary
| Event | Timing |
|-------|--------|
| Work loop interval | 5 seconds |
| Mass sync check | Every ~7 days (10080 minutes) |
| Daily sync check | Every ~24 hours (1440 minutes) |
| Hourly sync check | Every ~1 hour (60 minutes) |
| Query lookback (incremental) | 3× the interval |
| Data update parallelism | Up to 8 concurrent |
| Bulk copy batch size | 10,000 rows |
| Data batch grouping | 1,000,000 rows |
---
## Database Tables
### DataUpdate
Tracks all data sync operations:
```sql
CREATE TABLE dbo.DataUpdate (
ID INT IDENTITY PRIMARY KEY,
SourceSystem VARCHAR(50), -- 'JDE', 'CMS'
SourceData VARCHAR(50), -- 'WORKORDER', 'LOTUSAGE', etc.
TableName VARCHAR(50), -- Destination table name
StartDT DATETIME,
EndDT DATETIME,
UpdateType SMALLINT, -- 1=Hourly, 2=Daily, 3=Mass
WasSuccessful BIT,
NumberRecords BIGINT -- -2=in-progress, -1=crashed, N=count
)
```
### Search
Tracks search requests and results:
```sql
CREATE TABLE dbo.Search (
ID INT IDENTITY PRIMARY KEY,
UserName VARCHAR(128),
Name VARCHAR(128),
Status SMALLINT, -- 0-4 (see SearchStatus enum)
SubmitDT DATETIME,
StartDT DATETIME,
EndDT DATETIME,
Criteria VARCHAR(MAX), -- JSON serialized search criteria
Results VARBINARY(MAX) -- Excel file bytes
)
```
---
## Error Handling
| Scenario | Behavior |
|----------|----------|
| Data update failure | Logged, continues with next update |
| Search execution failure | Status set to Error (4), logged |
| SignalR publish failure | Logged, processing continues |
| Top-level DoWork exception | Logged, swallowed, loop continues |
| Service crash during update | `NumberRecords = -2` cleaned up on restart |
| Service crash during search | `ResetPartialSearches()` requeues on restart |
---
## Configuration
### App.config Settings
| Key | Purpose |
|-----|---------|
| `HubHost` | SignalR hub URL for status updates |
| `querytimeout` | SQL query timeout (default: 600 seconds) |
### dsconfig/*.json
Each data source has a JSON configuration file defining:
- Source system and data identifiers
- Destination table name
- Data fetch function reference
- Schedule configurations (Mass/Daily/Hourly)
See `DataSyncReport.md` for complete configuration details.
---
## Known Issues in Legacy Implementation
| Issue | Severity | Description | Location |
|-------|----------|-------------|----------|
| Hourly lookback bug | Medium | `GetPendingUpdateTasks()` uses `DailyUpdateDT` and `DailyInterval` for hourly updates instead of hourly equivalents. Results in 3-day lookback instead of 3 hours. | `UpdateProcessor.cs:101-103` |
| Unused cleanup code | Low | `CloseOpenUpdateEntries()` is defined but never called. Crashed updates (`NumberRecords = -2`) are never cleaned up. | `UpdateProcessor.DataUpdateEntry.cs:28` |
| Fire-and-forget SignalR | Low | `PublishStatus` and `PublishSearchUpdate` are `async void` methods. Failures don't propagate and new connections are created per-call. | `WorkProcessor.cs:61-107` |
---
## Key Differences from NEW Implementation
| Aspect | OLD (Legacy) | NEW (.NET 10) |
|--------|--------------|---------------|
| Hosting | Topshelf Windows Service | .NET BackgroundService |
| Loop pattern | Manual thread + sleep | IHostedService + Timer |
| Parallelism | `Parallel.ForEach` | Configurable via options |
| SignalR | Legacy ASP.NET SignalR | ASP.NET Core SignalR |
| Configuration | JSON files + App.config | `appsettings.json` + `pipelines.json` |
| Scheduling | Interval-based polling | Same pattern, configurable defaults |
| Hourly lookback | Uses daily values (bug) | Should use correct hourly values |
| Cleanup on start | Not implemented | Should clean up orphaned updates |
+717
View File
@@ -0,0 +1,717 @@
# WorkProcessor Design
## Overview
The WorkProcessor is a unified background service that coordinates both data synchronization and search processing for the JDE Scoping Tool. It implements the legacy work loop pattern where data freshness takes priority over search processing.
## Architecture
```
┌─────────────────────────────────────────────────────────────────────┐
│ WorkProcessor │
│ (BackgroundService) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ExecuteAsync (main loop) │
│ ├── Startup: CloseOpenUpdateEntries + ResetPartialSearches │
│ │ │
│ └── while (!cancelled): │
│ ├── DoWorkAsync() │
│ │ ├── Check pending data syncs (IScheduleChecker) │
│ │ │ └── If any → ISyncOrchestrator.ExecutePendingSyncsAsync│
│ │ │ │
│ │ └── If no pending syncs: │
│ │ ├── ISearchRepository.GetNextQueuedSearchAsync() │
│ │ └── ISearchExecutionService.ExecuteSearchAsync() │
│ │ │
│ ├── Periodic: PurgeOldDataUpdateEntries │
│ └── Task.Delay(WorkInterval) │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
## Design Principles
1. **Clean Architecture**: Dependencies flow inward; infrastructure depends on abstractions
2. **Single Responsibility**: WorkProcessor only coordinates; actual work delegated to specialized services
3. **Priority-Based Processing**: Data syncs always take precedence over search processing
4. **Graceful Recovery**: Handles interrupted operations from prior runs at startup
5. **Observable**: Status notifications via injected abstraction
## Layer Boundaries
```
┌─────────────────────────────────────────────────────────────────────┐
│ JdeScoping.Host / JdeScoping.Api (Presentation Layer) │
│ ├── StatusHub (SignalR) │
│ └── SearchNotificationService : ISearchNotificationService │
├─────────────────────────────────────────────────────────────────────┤
│ JdeScoping.DataSync (Application/Infrastructure Layer) │
│ ├── WorkProcessor : BackgroundService │
│ ├── SearchExecutionService : ISearchExecutionService │
│ ├── SearchRepository : ISearchRepository │
│ └── Uses: ISyncOrchestrator, IScheduleChecker, IDataUpdateRepository│
├─────────────────────────────────────────────────────────────────────┤
│ JdeScoping.Core (Domain/Contracts Layer) │
│ ├── ISearchNotificationService (interface only) │
│ ├── ISearchProcessor (interface only) │
│ ├── IExcelExportService (interface only - existing) │
│ └── Models: Search, SearchStatus, SearchUpdate, StatusUpdate │
├─────────────────────────────────────────────────────────────────────┤
│ JdeScoping.DataAccess (Infrastructure Layer) │
│ └── SearchProcessor : ISearchProcessor │
├─────────────────────────────────────────────────────────────────────┤
│ JdeScoping.ExcelIO (Infrastructure Layer) │
│ └── ExcelExportService : IExcelExportService │
└─────────────────────────────────────────────────────────────────────┘
```
**Key Boundary Rule**: DataSync does NOT reference Api. SignalR notification implementation lives in Host/Api; DataSync only depends on `ISearchNotificationService` interface from Core.
## Components
### WorkProcessor (BackgroundService)
Main orchestration service implementing the work loop.
```csharp
public class WorkProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IOptions<WorkProcessorOptions> _options;
private readonly ILogger<WorkProcessor> _logger;
private readonly DataSyncMetrics _metrics;
private DateTime _lastPurgeCheck = DateTime.MinValue;
private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.Enabled)
{
_logger.LogInformation("WorkProcessor is disabled");
return;
}
_logger.LogInformation(
"WorkProcessor starting with WorkInterval={WorkInterval}",
_options.Value.WorkInterval);
// Startup cleanup
await StartupCleanupAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
string status = "Idle";
try
{
status = await DoWorkAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("WorkProcessor stopping gracefully");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in work cycle");
_metrics.RecordCycleError();
}
finally
{
// Always notify status (best-effort)
await NotifyStatusSafeAsync(status, stoppingToken);
}
try
{
await Task.Delay(_options.Value.WorkInterval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
await NotifyStatusSafeAsync("Stopped", CancellationToken.None);
_logger.LogInformation("WorkProcessor stopped");
}
private async Task StartupCleanupAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Close interrupted DataUpdate entries
try
{
var dataUpdateRepo = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct);
if (closedCount > 0)
{
_logger.LogWarning("Closed {Count} interrupted data update entries", closedCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close open data update entries");
}
// Reset partial searches
try
{
var searchRepo = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var resetCount = await searchRepo.ResetPartialSearchesAsync(ct);
if (resetCount > 0)
{
_logger.LogWarning("Reset {Count} partial searches to Queued", resetCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reset partial searches");
}
}
private async Task<string> DoWorkAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Priority 1: Data syncs
var scheduleChecker = scope.ServiceProvider.GetRequiredService<IScheduleChecker>();
var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct);
if (pendingTasks.Count > 0)
{
await NotifyStatusSafeAsync("Updating data cache", ct);
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
await orchestrator.ExecutePendingSyncsAsync(ct);
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
// Priority 2: Search processing (only when syncs are current)
var searchRepository = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var search = await searchRepository.GetNextQueuedSearchAsync(ct);
if (search != null)
{
await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct);
var executionService = scope.ServiceProvider.GetRequiredService<ISearchExecutionService>();
await executionService.ExecuteSearchAsync(search, ct);
}
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct)
{
if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval)
{
return;
}
_lastPurgeCheck = DateTime.UtcNow;
try
{
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var purgedCount = await repository.PurgeOldEntriesAsync(
_options.Value.PurgeRetentionDays, ct);
if (purgedCount > 0)
{
_logger.LogInformation(
"Purged {Count} DataUpdate records older than {Days} days",
purgedCount, _options.Value.PurgeRetentionDays);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to purge old data update entries");
}
}
private async Task NotifyStatusSafeAsync(string status, CancellationToken ct)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var notificationService = scope.ServiceProvider.GetRequiredService<ISearchNotificationService>();
await notificationService.NotifyStatusAsync(status, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send status notification");
// Best-effort - don't throw
}
}
}
```
### WorkProcessorOptions
Configuration for the work processor.
```csharp
public class WorkProcessorOptions
{
public const string SectionName = "WorkProcessor";
/// <summary>
/// Whether the work processor is enabled. Default: true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval between work cycles. Default: 5 seconds.
/// </summary>
public TimeSpan WorkInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Search execution timeout. Default: 30 minutes.
/// </summary>
public TimeSpan SearchTimeout { get; set; } = TimeSpan.FromMinutes(30);
/// <summary>
/// Number of days to retain DataUpdate records. Default: 30.
/// </summary>
public int PurgeRetentionDays { get; set; } = 30;
}
```
### ISearchRepository
Interface for search table operations (in JdeScoping.DataSync.Contracts).
```csharp
public interface ISearchRepository
{
/// <summary>
/// Gets the next queued search ordered by SubmitDT (FIFO).
/// </summary>
Task<Search?> GetNextQueuedSearchAsync(CancellationToken ct = default);
/// <summary>
/// Resets partial searches (Running but not Ended) back to Queued status.
/// Returns count of reset searches.
/// </summary>
Task<int> ResetPartialSearchesAsync(CancellationToken ct = default);
/// <summary>
/// Marks search as Running with StartDT = now.
/// </summary>
Task StartSearchAsync(int searchId, CancellationToken ct = default);
/// <summary>
/// Marks search as Ended (success) or Error (failure) with EndDT and optional Results.
/// </summary>
Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default);
}
```
### ISearchProcessor (NEW - in JdeScoping.Core.Interfaces)
Interface for search query execution.
```csharp
public interface ISearchProcessor
{
/// <summary>
/// Executes search and materializes all results into SearchModel.
/// </summary>
Task<SearchModel> ExecuteSearchToModelAsync(SearchModel model, CancellationToken ct = default);
}
```
### ISearchExecutionService
Interface for search execution pipeline (in JdeScoping.DataSync.Contracts).
```csharp
public interface ISearchExecutionService
{
/// <summary>
/// Executes the complete search pipeline: query, Excel generation, result storage.
/// Handles status transitions and notifications.
/// </summary>
Task ExecuteSearchAsync(Search search, CancellationToken ct = default);
}
```
### SearchExecutionService
Implementation with proper cancellation handling.
```csharp
public class SearchExecutionService : ISearchExecutionService
{
private readonly ISearchRepository _searchRepository;
private readonly ISearchProcessor _searchProcessor;
private readonly IExcelExportService _excelExportService;
private readonly ISearchNotificationService _notificationService;
private readonly IOptions<WorkProcessorOptions> _options;
private readonly ILogger<SearchExecutionService> _logger;
public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default)
{
using var timeoutCts = new CancellationTokenSource(_options.Value.SearchTimeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token);
try
{
// Mark as Running
await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token);
search.Status = SearchStatus.Running;
search.StartDT = DateTime.UtcNow;
await _notificationService.NotifySearchUpdateAsync(search, linkedCts.Token);
// Execute search query
var model = new SearchModel { Id = search.Id };
await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token);
// Generate Excel
var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token);
// Complete with success
await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token);
search.Status = SearchStatus.Ended;
search.EndDT = DateTime.UtcNow;
await _notificationService.NotifySearchUpdateAsync(search, linkedCts.Token);
_logger.LogInformation(
"Search {SearchId} completed successfully with {ResultCount} results",
search.Id, model.Results.Count);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Host shutdown - don't mark as error, let startup cleanup handle it
_logger.LogInformation(
"Search {SearchId} interrupted by shutdown, will be requeued on restart",
search.Id);
// Don't rethrow - let the caller handle graceful shutdown
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)
{
// Timeout - mark as error
_logger.LogWarning(
"Search {SearchId} timed out after {Timeout}",
search.Id, _options.Value.SearchTimeout);
await CompleteWithErrorAsync(search, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Search {SearchId} failed", search.Id);
await CompleteWithErrorAsync(search, CancellationToken.None);
}
}
private async Task CompleteWithErrorAsync(Search search, CancellationToken ct)
{
try
{
await _searchRepository.CompleteSearchAsync(search.Id, false, null, ct);
search.Status = SearchStatus.Error;
search.EndDT = DateTime.UtcNow;
await _notificationService.NotifySearchUpdateAsync(search, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id);
}
}
}
```
### ISearchNotificationService (in JdeScoping.Core.Interfaces)
Interface for SignalR notifications - lives in Core to avoid dependency issues.
```csharp
public interface ISearchNotificationService
{
/// <summary>
/// Notifies clients of search status update.
/// </summary>
Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default);
/// <summary>
/// Notifies clients of work processor status change.
/// </summary>
Task NotifyStatusAsync(string status, CancellationToken ct = default);
}
```
### SearchNotificationService (in JdeScoping.Host or JdeScoping.Api)
Implementation using ASP.NET Core SignalR - lives in presentation layer.
```csharp
public class SearchNotificationService : ISearchNotificationService
{
private readonly IHubContext<StatusHub> _hubContext;
private readonly ILogger<SearchNotificationService> _logger;
public SearchNotificationService(
IHubContext<StatusHub> hubContext,
ILogger<SearchNotificationService> logger)
{
_hubContext = hubContext;
_logger = logger;
}
public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default)
{
try
{
var update = new SearchUpdate
{
Id = search.Id,
Status = search.Status,
StartDT = search.StartDT,
EndDT = search.EndDT
};
await _hubContext.Clients.All.SendAsync("SearchUpdated", update, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send search update notification");
}
}
public async Task NotifyStatusAsync(string status, CancellationToken ct = default)
{
try
{
var update = new StatusUpdate
{
Message = status,
Timestamp = DateTime.UtcNow
};
await _hubContext.Clients.All.SendAsync("StatusUpdated", update, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send status notification");
}
}
}
```
## Configuration
### appsettings.json
```json
{
"WorkProcessor": {
"Enabled": true,
"WorkInterval": "00:00:05",
"SearchTimeout": "00:30:00",
"PurgeRetentionDays": 30
},
"DataSync": {
"MaxDegreeOfParallelism": 8,
"LookbackMultiplier": 3,
"SyncTimeoutSeconds": 3600
}
}
```
## Dependency Injection
### In JdeScoping.DataSync/DependencyInjection.cs
```csharp
public static IServiceCollection AddDataSyncServices(
this IServiceCollection services,
IConfiguration configuration)
{
// Options
services.Configure<WorkProcessorOptions>(
configuration.GetSection(WorkProcessorOptions.SectionName));
services.Configure<DataSyncOptions>(
configuration.GetSection(DataSyncOptions.SectionName));
// Existing DataSync services...
services.AddSingleton<IEtlPipelineFactory, EtlPipelineFactory>();
services.AddSingleton<DataSyncMetrics>();
services.AddScoped<ISyncOrchestrator, SyncOrchestrator>();
services.AddScoped<IScheduleChecker, ScheduleChecker>();
services.AddScoped<ITableSyncOperation, TableSyncOperation>();
services.AddScoped<IDataUpdateRepository, DataUpdateRepository>();
// Search services (scoped for parallel isolation)
services.AddScoped<ISearchRepository, SearchRepository>();
services.AddScoped<ISearchExecutionService, SearchExecutionService>();
// Background service
services.AddHostedService<WorkProcessor>();
return services;
}
```
### In JdeScoping.Host/Program.cs (or similar)
```csharp
// Register SignalR notification service (presentation layer implementation)
builder.Services.AddScoped<ISearchNotificationService, SearchNotificationService>();
```
## Legacy Bug Fixes
### Bug 1: Hourly Lookback Uses Daily Values
**Legacy Issue**: In `ScheduleChecker.CheckConfigSchedule()` lines 110-114, hourly sync uses `lastDaily` and `config.DailyConfig.IntervalMinutes` for lookback calculation.
**Fix Required**: Update ScheduleChecker to use hourly values:
```csharp
// BEFORE (buggy):
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
// AFTER (fixed):
var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes);
```
### Bug 2: Unused Cleanup Code
**Legacy Issue**: `CloseOpenUpdateEntries()` was defined but never called.
**Fix**: WorkProcessor calls cleanup at startup via `StartupCleanupAsync()`:
- `IDataUpdateRepository.CloseOpenUpdateEntriesAsync()` for DataUpdate table
- `ISearchRepository.ResetPartialSearchesAsync()` for Search table
### Bug 3: Fire-and-Forget SignalR
**Legacy Issue**: `async void` methods with per-call `HubConnection` instances.
**Fix**:
- Use proper async/await with injected `IHubContext<StatusHub>`
- Interface in Core, implementation in Host/Api
- Error handling logs but doesn't throw (best-effort notifications)
- No Status property setter with side effects
## File Structure
### New Files
```
src/JdeScoping.Core/Interfaces/
├── ISearchProcessor.cs # Interface for search execution
└── ISearchNotificationService.cs # Interface for SignalR notifications
src/JdeScoping.Core/Models/
└── StatusUpdate.cs # DTO for status notifications (if not existing)
src/JdeScoping.DataSync/
├── WorkProcessor.cs # Main BackgroundService
├── Options/
│ └── WorkProcessorOptions.cs # Configuration options
├── Contracts/
│ ├── ISearchRepository.cs # Search table operations
│ └── ISearchExecutionService.cs # Search pipeline orchestration
└── Services/
├── SearchRepository.cs # ISearchRepository impl
└── SearchExecutionService.cs # ISearchExecutionService impl
src/JdeScoping.Host/ (or JdeScoping.Api/)
└── Services/
└── SearchNotificationService.cs # ISearchNotificationService impl
tests/JdeScoping.DataSync.Tests/
├── WorkProcessorTests.cs # Main service tests
└── Services/
├── SearchRepositoryTests.cs
└── SearchExecutionServiceTests.cs
```
### Modified Files
```
src/JdeScoping.Core/Interfaces/
└── IExcelExportService.cs # Existing - update signature if needed
src/JdeScoping.DataSync/
├── DependencyInjection.cs # Add WorkProcessor, remove DataSyncService
├── Options/DataSyncOptions.cs # Remove Enabled (moved to WorkProcessor)
└── Services/ScheduleChecker.cs # Fix hourly lookback bug
src/JdeScoping.DataAccess/
└── Services/SearchProcessor.cs # Implement ISearchProcessor interface
src/JdeScoping.Host/
├── Program.cs # Register SearchNotificationService
├── appsettings.json # Add WorkProcessor section
└── appsettings.Development.json # Add WorkProcessor section
```
### Removed Files
```
src/JdeScoping.DataSync/
└── DataSyncService.cs # Replaced by WorkProcessor
```
## Test Coverage
### WorkProcessorTests
- Service starts and stops gracefully
- Disabled via configuration skips processing
- Data sync priority: syncs block search processing
- Search processing: executes when no pending syncs
- Startup cleanup: calls CloseOpenUpdateEntries and ResetPartialSearches
- Purge: runs periodically every 24 hours
- Error handling: continues loop on exceptions
- Cancellation: responds to stopping token
### SearchRepositoryTests
- GetNextQueuedSearchAsync returns oldest queued search
- GetNextQueuedSearchAsync returns null when none queued
- ResetPartialSearchesAsync resets Running searches to Queued
- StartSearchAsync updates status and StartDT
- CompleteSearchAsync success path (Ended status, results stored)
- CompleteSearchAsync failure path (Error status, no results)
### SearchExecutionServiceTests
- ExecuteSearchAsync success: full pipeline completion
- ExecuteSearchAsync failure: error status set
- ExecuteSearchAsync timeout: handled gracefully, marked as error
- ExecuteSearchAsync shutdown: not marked as error, left for requeue
- Notifications sent on status changes
- Excel generation called with search results
### ScheduleCheckerTests (additional)
- Hourly lookback uses hourly timestamp and interval (bug fix verification)
## Atomic Search Claim (Concurrency Note)
If the system may run multiple instances, `GetNextQueuedSearchAsync` should atomically claim the search:
```sql
WITH NextSearch AS (
SELECT TOP 1 *
FROM dbo.Search
WHERE Status = 1 -- Queued
ORDER BY SubmitDT
)
UPDATE NextSearch
SET Status = 2, StartDT = GETUTCDATE()
OUTPUT INSERTED.*;
```
This ensures no two instances process the same search. The current design assumes single-instance; update if multi-instance is required.
File diff suppressed because it is too large Load Diff