Files
Joseph Doherty 8630a5d32b docs: add WorkProcessor design and implementation documentation
- WorkProcessorReport.md: Analysis of legacy work processor from OLD solution
- Design document with clean architecture and component specifications
- Implementation plan with 15 TDD tasks
2026-01-07 06:30:54 -05:00

16 KiB
Raw Permalink Blame History

Work Processor Report

This document describes the background work processor from the legacy JDE Scoping Tool (LotFinder) WorkerService. The work processor is responsible for coordinating data synchronization and search processing.

Executive Summary

The WorkProcessor class is the main orchestration component that runs as a Windows service (via Topshelf). It implements a continuous polling loop that:

  1. Checks if any data syncs are overdue
  2. If overdue syncs exist → performs data updates in parallel
  3. If all syncs are current → processes the next queued search
  4. Sleeps for 5 seconds and repeats

Key Design Principle: Data freshness takes priority over search processing. Searches are only processed when all data caches are up to date.


Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                        WorkProcessor                            │
│                     (Topshelf Service)                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌──────────────┐     ┌───────────────┐     ┌──────────────┐  │
│   │    Start()   │────▶│   DoWork()    │────▶│    Stop()    │  │
│   └──────────────┘     └───────┬───────┘     └──────────────┘  │
│                                │                                │
│              ┌─────────────────┼─────────────────┐              │
│              ▼                 ▼                 ▼              │
│   ┌──────────────────┐  ┌───────────┐  ┌─────────────────┐     │
│   │ UpdateProcessor  │  │  Search   │  │   SignalR Hub   │     │
│   │ (Data Sync)      │  │ Processor │  │ (Status Update) │     │
│   └──────────────────┘  └───────────┘  └─────────────────┘     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Source Files

File Purpose
OLD/WorkerService/Process/WorkProcessor.cs Main orchestration loop
OLD/WorkerService/Process/UpdateProcessor.cs Data sync coordination
OLD/WorkerService/Process/UpdateProcessor.DataUpdateEntry.cs DataUpdate table logging
OLD/WorkerService/Process/UpdateProcessor.TableManagement.cs Staging/merge SQL generation
OLD/WorkerService/Process/LotFinderDBExt.cs Search query execution
OLD/WorkerService/Process/ExcelWriter.cs Excel result generation
OLD/WorkerService/Program.cs Topshelf service configuration

Main Work Loop

Constants

Constant Value Description
WAIT_INTERVAL 5000 ms (5 seconds) Sleep duration between work cycles

Loop Structure

Start()
  └── Thread spawned
        └── while(true)
              ├── DoWork()
              └── cancel.WaitOne(WAIT_INTERVAL)
                    └── if signaled → break

The loop runs indefinitely until Stop() is called, which sets the cancel ManualResetEvent.


DoWork() Algorithm

The DoWork() method implements the following decision flow:

DoWork()
│
├─▶ GetPendingUpdateTasks()
│       └── Returns list of overdue data syncs
│
├─▶ IF pending updates exist:
│       ├── Status = "Updating data cache"
│       ├── Parallel.ForEach(pending, MaxDegreeOfParallelism=8)
│       │       └── UpdateProcessor.DoUpdate(task)
│       └── RETURN (skip search processing this cycle)
│
├─▶ ELSE (all data syncs current):
│       │
│       ├── ResetPartialSearches()
│       │       └── Reset any searches that started but never completed
│       │
│       ├── GetNextSearch()
│       │       └── Get oldest queued search (Status=1, ORDER BY SubmitDT)
│       │
│       └── IF search found:
│               ├── Status = "Processing search #{ID}"
│               ├── StartSearch(search)
│               │       └── Status → 2 (Started), set StartDT
│               ├── PublishSearchUpdate(search) → SignalR
│               │
│               ├── Execute search query
│               ├── Generate Excel results
│               ├── Save debug file: search_{ID}.xlsx
│               │
│               ├── CompleteSearch(search, success=true)
│               │       └── Status → 3 (Ended), store Results
│               └── PublishSearchUpdate(search) → SignalR
│
│               ON EXCEPTION:
│               ├── Log error
│               ├── CompleteSearch(search, success=false)
│               │       └── Status → 4 (Error)
│               └── PublishSearchUpdate(search) → SignalR
│
└── Status = "Idle"

Data Sync Scheduling

How "Overdue" Is Determined

The GetPendingUpdateTasks() method checks each configured data source against the LastDataUpdates view:

-- LastDataUpdates view returns the most recent successful update per table/type
SELECT TableName,
       COALESCE([3], '1970-01-01') AS MassUpdateDT,    -- UpdateType 3 = Mass
       COALESCE([2], '1970-01-01') AS DailyUpdateDT,   -- UpdateType 2 = Daily
       COALESCE([1], '1970-01-01') AS HourlyUpdateDT   -- UpdateType 1 = Hourly
FROM (pivot query on DataUpdate table)

Priority Order (Exclusive)

Only ONE update type is selected per data source per cycle:

Priority Condition Update Type
1 No record exists OR Now > MassUpdateDT + MassInterval Mass (full reload)
2 Now > DailyUpdateDT + DailyInterval Daily (incremental)
3 Now > HourlyUpdateDT + HourlyInterval Hourly (incremental)

Lookback Multiplier

For incremental updates (Daily/Hourly), the query uses a lookback of 3× the interval:

// Daily update example
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);
// If DailyInterval = 1440 (24 hours), lookback = 3 days

// Hourly update example (note: uses Daily values - see bug below)
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);

Purpose: The 3× multiplier provides overlap to catch any records that may have been missed due to timing edge cases or delays.

⚠️ Legacy Bug: In GetPendingUpdateTasks(), the Hourly update incorrectly uses DailyUpdateDT and DailyUpdateConfig.Interval instead of the hourly equivalents. This means hourly updates use a 3-day lookback instead of 3 hours. The separate DoUpdate(tableName, updateType) overload correctly uses hourly values, but this path is not called from the main work loop.

Parallel Execution

Data updates run in parallel with controlled concurrency:

Parallel.ForEach(pending,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    task => UpdateProcessor.DoUpdate(task));

Data Update Process

Each DoUpdate(task) execution:

  1. Log Start: Insert DataUpdate record with NumberRecords = -2 (in-progress marker)
  2. Prepurge (if configured): TRUNCATE TABLE {destination}
  3. Fetch Data: Call the configured data fetch function (JDE/CMS query)
  4. Batch Processing (1M records per batch):
    • Create staging table (#Staging{table}) - local temp table
    • Create index on staging table (PK columns + LastUpdateDT/ReleaseDate)
    • Disable non-PK indexes on staging table
    • Bulk copy data to staging (batch size: 10,000)
    • Rebuild indexes on staging table
    • Create temp table (#{table}) with deduplication via ROW_NUMBER() OVER (PARTITION BY {PK} ORDER BY LastUpdateDT)
    • MERGE to destination table (only updates when TARGET.LastUpdateDT < SOURCE.LastUpdateDT)
  5. Post-Processing: Run configured action (e.g., PostProcessMisData)
  6. Reindex (if configured): Rebuild all non-PK indexes on destination
  7. Log End: Update DataUpdate record with WasSuccessful, NumberRecords

In-Progress Detection

Records with NumberRecords = -2 indicate updates that started but never completed (crash/restart).

Note: The codebase contains a CloseOpenUpdateEntries() method to clean up these records, but it is never called in the current implementation. This means crashed updates remain with NumberRecords = -2 indefinitely. The cleanup SQL exists but is unused:

-- Defined but never invoked
UPDATE dbo.DataUpdate
SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1
WHERE NumberRecords = -2

Search Processing

Search Status Values

Value Enum Description
0 New Created but not submitted
1 Submitted Queued for processing
2 Started Currently being processed
3 Ended Completed successfully
4 Error Failed with error

Partial Search Recovery

Before processing new searches, the system resets any "orphaned" searches:

-- ResetPartialSearches stored procedure
UPDATE dbo.Search
SET Status = 1, StartDT = NULL
WHERE StartDT IS NOT NULL AND EndDT IS NULL

This handles cases where the service crashed mid-search.

Search Execution Flow

  1. GetNextSearch: Query for oldest Status = 1 search
  2. StartSearch: Update to Status = 2, set StartDT
  3. Execute Query: Run the generated SQL against local cache
  4. Generate Excel: Create Excel file using EPPlus
  5. CompleteSearch: Update Status = 3 (or 4 on error), store Results blob

SignalR Integration

Status Updates

The Status property publishes changes to the SignalR hub:

public string Status
{
    set
    {
        if (!string.Equals(_status, value))
        {
            _status = value;
            logger.Info("Status: {0}", _status);
            PublishStatus(_status);  // → SignalR
        }
    }
}

Hub Methods

Method Payload When Called
SetStatus { Message, Timestamp } On status property change
PublishSearchUpdate SearchUpdate(search) After StartSearch, CompleteSearch

Connection Pattern

Each publish creates a new HubConnection:

string hubHost = ConfigurationManager.AppSettings["HubHost"];
using (HubConnection hubConnection = new HubConnection(hubHost))
{
    IHubProxy hubProxy = hubConnection.CreateHubProxy("StatusHub");
    await hubConnection.Start();
    await hubProxy.Invoke("SetStatus", update);
}

Note: This is an async void fire-and-forget pattern. Failures are logged but don't interrupt processing.


Timing Summary

Event Timing
Work loop interval 5 seconds
Mass sync check Every ~7 days (10080 minutes)
Daily sync check Every ~24 hours (1440 minutes)
Hourly sync check Every ~1 hour (60 minutes)
Query lookback (incremental) 3× the interval
Data update parallelism Up to 8 concurrent
Bulk copy batch size 10,000 rows
Data batch grouping 1,000,000 rows

Database Tables

DataUpdate

Tracks all data sync operations:

CREATE TABLE dbo.DataUpdate (
    ID INT IDENTITY PRIMARY KEY,
    SourceSystem VARCHAR(50),     -- 'JDE', 'CMS'
    SourceData VARCHAR(50),       -- 'WORKORDER', 'LOTUSAGE', etc.
    TableName VARCHAR(50),        -- Destination table name
    StartDT DATETIME,
    EndDT DATETIME,
    UpdateType SMALLINT,          -- 1=Hourly, 2=Daily, 3=Mass
    WasSuccessful BIT,
    NumberRecords BIGINT          -- -2=in-progress, -1=crashed, N=count
)

Tracks search requests and results:

CREATE TABLE dbo.Search (
    ID INT IDENTITY PRIMARY KEY,
    UserName VARCHAR(128),
    Name VARCHAR(128),
    Status SMALLINT,              -- 0-4 (see SearchStatus enum)
    SubmitDT DATETIME,
    StartDT DATETIME,
    EndDT DATETIME,
    Criteria VARCHAR(MAX),        -- JSON serialized search criteria
    Results VARBINARY(MAX)        -- Excel file bytes
)

Error Handling

Scenario Behavior
Data update failure Logged, continues with next update
Search execution failure Status set to Error (4), logged
SignalR publish failure Logged, processing continues
Top-level DoWork exception Logged, swallowed, loop continues
Service crash during update NumberRecords = -2 cleaned up on restart
Service crash during search ResetPartialSearches() requeues on restart

Configuration

App.config Settings

Key Purpose
HubHost SignalR hub URL for status updates
querytimeout SQL query timeout (default: 600 seconds)

dsconfig/*.json

Each data source has a JSON configuration file defining:

  • Source system and data identifiers
  • Destination table name
  • Data fetch function reference
  • Schedule configurations (Mass/Daily/Hourly)

See DataSyncReport.md for complete configuration details.


Known Issues in Legacy Implementation

Issue Severity Description Location
Hourly lookback bug Medium GetPendingUpdateTasks() uses DailyUpdateDT and DailyInterval for hourly updates instead of hourly equivalents. Results in 3-day lookback instead of 3 hours. UpdateProcessor.cs:101-103
Unused cleanup code Low CloseOpenUpdateEntries() is defined but never called. Crashed updates (NumberRecords = -2) are never cleaned up. UpdateProcessor.DataUpdateEntry.cs:28
Fire-and-forget SignalR Low PublishStatus and PublishSearchUpdate are async void methods. Failures don't propagate and new connections are created per-call. WorkProcessor.cs:61-107

Key Differences from NEW Implementation

Aspect OLD (Legacy) NEW (.NET 10)
Hosting Topshelf Windows Service .NET BackgroundService
Loop pattern Manual thread + sleep IHostedService + Timer
Parallelism Parallel.ForEach Configurable via options
SignalR Legacy ASP.NET SignalR ASP.NET Core SignalR
Configuration JSON files + App.config appsettings.json + pipelines.json
Scheduling Interval-based polling Same pattern, configurable defaults
Hourly lookback Uses daily values (bug) Should use correct hourly values
Cleanup on start Not implemented Should clean up orphaned updates