fix(data-access): correct self-referential SQL in WorkCenter filter
The WHERE clause was comparing Code to itself instead of the aliased table reference, which would always be true.
This commit is contained in:
@@ -0,0 +1,422 @@
|
||||
# ETL Performance Optimization Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Optimize dev ETL pipelines to significantly reduce load times for large tables (currently 18+ minutes for 88M rows)
|
||||
|
||||
**Architecture:** Four-pronged optimization: SqlBulkCopy tuning, parallel table loading, Utf8JsonReader parsing, and zstd buffer optimization
|
||||
|
||||
**Tech Stack:** .NET 10, System.Text.Json (Utf8JsonReader), System.Buffers (ArrayPool), SqlBulkCopy, ZstdSharp
|
||||
|
||||
---
|
||||
|
||||
## Task 1: SqlBulkCopy Performance Tuning
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs`
|
||||
|
||||
**Step 1: Add SqlBulkCopyOptions.TableLock**
|
||||
|
||||
TableLock acquires a bulk update lock on the table during the bulk copy operation, reducing logging overhead.
|
||||
|
||||
```csharp
|
||||
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null)
|
||||
{
|
||||
DestinationTableName = qualifiedName,
|
||||
BatchSize = _batchSize,
|
||||
BulkCopyTimeout = _commandTimeoutSeconds,
|
||||
EnableStreaming = true
|
||||
};
|
||||
```
|
||||
|
||||
**Step 2: Increase default batch size**
|
||||
|
||||
Change from 10,000 to 100,000 rows per batch to reduce round-trips.
|
||||
|
||||
```csharp
|
||||
private const int DefaultBatchSize = 100000;
|
||||
```
|
||||
|
||||
**Step 3: Reduce NotifyAfter frequency**
|
||||
|
||||
Currently fires 8,800+ times for 88M rows. Reduce to every 10 batches.
|
||||
|
||||
```csharp
|
||||
bulkCopy.NotifyAfter = _batchSize * 10;
|
||||
```
|
||||
|
||||
**Step 4: Add option for infinite timeout**
|
||||
|
||||
Add a constant for large table loads.
|
||||
|
||||
```csharp
|
||||
private const int InfiniteTimeout = 0;
|
||||
```
|
||||
|
||||
**Step 5: Verify changes compile**
|
||||
|
||||
Run: `dotnet build NEW/src/JdeScoping.DataSync/`
|
||||
Expected: Build succeeded
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Parallel Loading in DevEtlRegistry
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync/DevEtl/DevEtlRegistry.cs`
|
||||
|
||||
**Step 1: Add RunAllParallelAsync method**
|
||||
|
||||
```csharp
|
||||
public async Task<IReadOnlyList<PipelineResult>> RunAllParallelAsync(
|
||||
int maxDegreeOfParallelism = 4,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = new ConcurrentBag<PipelineResult>();
|
||||
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
|
||||
|
||||
// Separate tables by size - run very large ones sequentially at the end
|
||||
var smallMediumTables = GetAvailableTables()
|
||||
.Where(t => !IsVeryLargeTable(t))
|
||||
.ToList();
|
||||
var veryLargeTables = GetAvailableTables()
|
||||
.Where(IsVeryLargeTable)
|
||||
.ToList();
|
||||
|
||||
// Run small/medium tables in parallel
|
||||
var tasks = smallMediumTables.Select(async tableName =>
|
||||
{
|
||||
await semaphore.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
var result = await RunAsync(tableName, cancellationToken);
|
||||
results.Add(result);
|
||||
}
|
||||
finally
|
||||
{
|
||||
semaphore.Release();
|
||||
}
|
||||
});
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
// Run very large tables sequentially (IO-bound, would contend)
|
||||
foreach (var tableName in veryLargeTables)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var result = await RunAsync(tableName, cancellationToken);
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
return results.ToList();
|
||||
}
|
||||
|
||||
private static bool IsVeryLargeTable(string tableName) =>
|
||||
tableName.Contains("WorkOrderTime", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("WorkOrderStep", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("WorkOrderRouting", StringComparison.OrdinalIgnoreCase);
|
||||
```
|
||||
|
||||
**Step 2: Add required using statements**
|
||||
|
||||
```csharp
|
||||
using System.Collections.Concurrent;
|
||||
```
|
||||
|
||||
**Step 3: Verify changes compile**
|
||||
|
||||
Run: `dotnet build NEW/src/JdeScoping.DataSync/`
|
||||
Expected: Build succeeded
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Utf8JsonReader-Based Streaming Parser
|
||||
|
||||
**Files:**
|
||||
- Create: `NEW/src/JdeScoping.DataSync/Etl/Sources/Utf8JsonStreamingDataReader.cs`
|
||||
- Modify: `NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs`
|
||||
|
||||
**Step 1: Create Utf8JsonStreamingDataReader**
|
||||
|
||||
This replaces the current char-by-char parsing with efficient Utf8JsonReader.
|
||||
|
||||
```csharp
|
||||
using System.Buffers;
|
||||
using System.Data;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using JdeScoping.DataSync.Etl.Models;
|
||||
|
||||
namespace JdeScoping.DataSync.Etl.Sources;
|
||||
|
||||
/// <summary>
|
||||
/// High-performance streaming JSON array reader using Utf8JsonReader.
|
||||
/// </summary>
|
||||
internal sealed class Utf8JsonStreamingDataReader : IDataReader
|
||||
{
|
||||
private const int DefaultBufferSize = 256 * 1024; // 256 KB
|
||||
|
||||
private readonly Stream _stream;
|
||||
private readonly JsonColumnSchema[] _schema;
|
||||
private readonly Dictionary<string, int> _nameToOrdinal;
|
||||
private readonly byte[][] _encodedColumnNames;
|
||||
private byte[] _buffer;
|
||||
private int _bytesInBuffer;
|
||||
private int _bytesConsumed;
|
||||
private JsonReaderState _readerState;
|
||||
private object?[] _currentRow;
|
||||
private bool _disposed;
|
||||
private bool _started;
|
||||
private bool _finished;
|
||||
|
||||
public Utf8JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
||||
_buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
|
||||
_currentRow = new object?[schema.Length];
|
||||
_readerState = new JsonReaderState();
|
||||
|
||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
||||
_encodedColumnNames = new byte[schema.Length][];
|
||||
for (int i = 0; i < schema.Length; i++)
|
||||
{
|
||||
_nameToOrdinal[schema[i].Name] = i;
|
||||
_encodedColumnNames[i] = Encoding.UTF8.GetBytes(schema[i].Name);
|
||||
}
|
||||
}
|
||||
|
||||
// ... IDataReader implementation
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Implement Read() with Utf8JsonReader**
|
||||
|
||||
Key difference: Parse directly from byte buffer, no string allocation per object.
|
||||
|
||||
```csharp
|
||||
public bool Read()
|
||||
{
|
||||
if (_disposed || _finished) return false;
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var reader = new Utf8JsonReader(
|
||||
_buffer.AsSpan(_bytesConsumed, _bytesInBuffer - _bytesConsumed),
|
||||
isFinalBlock: false,
|
||||
_readerState);
|
||||
|
||||
if (TryReadNextObject(ref reader))
|
||||
{
|
||||
_bytesConsumed += (int)reader.BytesConsumed;
|
||||
_readerState = reader.CurrentState;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Need more data
|
||||
if (!RefillBuffer())
|
||||
{
|
||||
// Final block
|
||||
reader = new Utf8JsonReader(
|
||||
_buffer.AsSpan(_bytesConsumed, _bytesInBuffer - _bytesConsumed),
|
||||
isFinalBlock: true,
|
||||
_readerState);
|
||||
|
||||
if (TryReadNextObject(ref reader))
|
||||
{
|
||||
_bytesConsumed += (int)reader.BytesConsumed;
|
||||
return true;
|
||||
}
|
||||
|
||||
_finished = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Implement TryReadNextObject with ValueTextEquals**
|
||||
|
||||
Use pre-encoded column names to avoid string allocations.
|
||||
|
||||
```csharp
|
||||
private bool TryReadNextObject(ref Utf8JsonReader reader)
|
||||
{
|
||||
if (!_started)
|
||||
{
|
||||
if (!reader.Read() || reader.TokenType != JsonTokenType.StartArray)
|
||||
throw new InvalidDataException("Expected JSON array.");
|
||||
_started = true;
|
||||
}
|
||||
|
||||
if (!reader.Read())
|
||||
return false;
|
||||
|
||||
if (reader.TokenType == JsonTokenType.EndArray)
|
||||
{
|
||||
_finished = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (reader.TokenType != JsonTokenType.StartObject)
|
||||
throw new InvalidDataException($"Expected object, got {reader.TokenType}");
|
||||
|
||||
Array.Clear(_currentRow);
|
||||
|
||||
while (reader.Read() && reader.TokenType != JsonTokenType.EndObject)
|
||||
{
|
||||
if (reader.TokenType != JsonTokenType.PropertyName)
|
||||
continue;
|
||||
|
||||
// Find matching column using pre-encoded names
|
||||
int ordinal = -1;
|
||||
for (int i = 0; i < _encodedColumnNames.Length; i++)
|
||||
{
|
||||
if (reader.ValueTextEquals(_encodedColumnNames[i]))
|
||||
{
|
||||
ordinal = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!reader.Read())
|
||||
return false;
|
||||
|
||||
if (ordinal >= 0)
|
||||
{
|
||||
_currentRow[ordinal] = ParseValue(ref reader, _schema[ordinal].ClrType);
|
||||
}
|
||||
}
|
||||
|
||||
return reader.TokenType == JsonTokenType.EndObject;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 4: Implement RefillBuffer**
|
||||
|
||||
```csharp
|
||||
private bool RefillBuffer()
|
||||
{
|
||||
// Move unconsumed data to start of buffer
|
||||
var remaining = _bytesInBuffer - _bytesConsumed;
|
||||
if (remaining > 0)
|
||||
{
|
||||
Buffer.BlockCopy(_buffer, _bytesConsumed, _buffer, 0, remaining);
|
||||
}
|
||||
_bytesInBuffer = remaining;
|
||||
_bytesConsumed = 0;
|
||||
|
||||
// Read more data
|
||||
var bytesRead = _stream.Read(_buffer, _bytesInBuffer, _buffer.Length - _bytesInBuffer);
|
||||
if (bytesRead == 0)
|
||||
return false;
|
||||
|
||||
_bytesInBuffer += bytesRead;
|
||||
return true;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 5: Update JsonZstdFileSource to use new reader**
|
||||
|
||||
Add constructor parameter to select reader implementation.
|
||||
|
||||
```csharp
|
||||
public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema, bool useHighPerformanceReader = true)
|
||||
{
|
||||
// ... existing validation ...
|
||||
_useHighPerformanceReader = useHighPerformanceReader;
|
||||
}
|
||||
|
||||
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
// ... existing setup ...
|
||||
|
||||
_reader = _useHighPerformanceReader
|
||||
? new Utf8JsonStreamingDataReader(bufferedStream, _schema)
|
||||
: new JsonStreamingDataReader(bufferedStream, _schema);
|
||||
|
||||
return Task.FromResult(_reader);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 6: Verify changes compile**
|
||||
|
||||
Run: `dotnet build NEW/src/JdeScoping.DataSync/`
|
||||
Expected: Build succeeded
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Zstd Buffer Optimizations
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs`
|
||||
|
||||
**Step 1: Add SequentialScan and sync IO**
|
||||
|
||||
```csharp
|
||||
_fileStream = new FileStream(
|
||||
_filePath,
|
||||
FileMode.Open,
|
||||
FileAccess.Read,
|
||||
FileShare.Read,
|
||||
bufferSize: 256 * 1024, // 256 KB
|
||||
FileOptions.SequentialScan); // Hint for OS read-ahead
|
||||
```
|
||||
|
||||
**Step 2: Wrap DecompressionStream in BufferedStream**
|
||||
|
||||
```csharp
|
||||
_decompressionStream = new DecompressionStream(_fileStream);
|
||||
var bufferedStream = new BufferedStream(_decompressionStream, 256 * 1024);
|
||||
_reader = new Utf8JsonStreamingDataReader(bufferedStream, _schema);
|
||||
```
|
||||
|
||||
**Step 3: Verify changes compile**
|
||||
|
||||
Run: `dotnet build NEW/src/JdeScoping.DataSync/`
|
||||
Expected: Build succeeded
|
||||
|
||||
---
|
||||
|
||||
## Task 5: Integration Testing
|
||||
|
||||
**Files:**
|
||||
- Test: `NEW/tests/JdeScoping.DataSync.Tests/DevEtl/`
|
||||
|
||||
**Step 1: Run existing tests to ensure no regressions**
|
||||
|
||||
Run: `dotnet test NEW/tests/JdeScoping.DataSync.Tests/ --filter "DevEtl"`
|
||||
Expected: All tests pass
|
||||
|
||||
**Step 2: Test parallel loading**
|
||||
|
||||
Create a simple console app to test:
|
||||
```csharp
|
||||
var registry = new DevEtlRegistry(factory, cacheDir);
|
||||
var sw = Stopwatch.StartNew();
|
||||
var results = await registry.RunAllParallelAsync(maxDegreeOfParallelism: 4);
|
||||
Console.WriteLine($"Total time: {sw.Elapsed}");
|
||||
foreach (var r in results.OrderByDescending(r => r.TotalRows))
|
||||
Console.WriteLine($"{r.SourceName}: {r.TotalRows:N0} rows in {r.Elapsed}");
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Expected Performance Improvements
|
||||
|
||||
| Optimization | Expected Impact |
|
||||
|--------------|-----------------|
|
||||
| TableLock | 20-40% faster for large tables (reduced logging) |
|
||||
| Batch size 100k | 10-20% faster (fewer round-trips) |
|
||||
| Utf8JsonReader | 30-50% faster parsing (zero-alloc) |
|
||||
| Parallel loading | 2-3x faster for full load (4 parallel) |
|
||||
| Buffer optimizations | 5-10% faster IO |
|
||||
|
||||
**Combined estimate:** Full load should drop from ~45 minutes to ~15-20 minutes.
|
||||
Reference in New Issue
Block a user