d4135e8ad3
The WHERE clause was comparing Code to itself instead of the aliased table reference, which would always be true.
237 lines
8.9 KiB
Markdown
237 lines
8.9 KiB
Markdown
# Development ETL Pipeline Design
|
|
|
|
## Purpose
|
|
|
|
Create development ETL pipelines that load data from cached `.json.zstd` files into the local SQL Server database. This enables local development and testing without requiring access to live Oracle/Sybase enterprise sources.
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ JsonZstdFileSource │
|
|
├─────────────────────────────────────────────────────────────┤
|
|
│ File Path (.json.zstd) │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ZstdSharp DecompressionStream │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ JsonStreamingDataReader : IDataReader │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ETL Pipeline (transformers → destination) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
**Execution Flow:**
|
|
1. `JsonZstdFileSource` opens the `.json.zstd` file
|
|
2. `ZstdSharp.DecompressionStream` decompresses on-the-fly
|
|
3. `JsonStreamingDataReader` parses JSON array, yielding one row at a time
|
|
4. ETL pipeline applies transformers and writes to SQL Server via bulk copy
|
|
|
|
## Components
|
|
|
|
### JsonColumnSchema
|
|
|
|
Column metadata record used by the streaming reader:
|
|
|
|
```csharp
|
|
public record JsonColumnSchema(
|
|
string Name,
|
|
Type ClrType,
|
|
bool IsNullable = true);
|
|
```
|
|
|
|
### JsonStreamingDataReader
|
|
|
|
Implements `IDataReader` to stream JSON array without loading into memory:
|
|
|
|
```csharp
|
|
internal class JsonStreamingDataReader : IDataReader
|
|
{
|
|
private readonly StreamReader _reader;
|
|
private readonly JsonColumnSchema[] _schema;
|
|
private readonly Dictionary<string, int> _nameToOrdinal;
|
|
private object?[] _currentRow;
|
|
|
|
public int FieldCount => _schema.Length;
|
|
public string GetName(int ordinal) => _schema[ordinal].Name;
|
|
public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType;
|
|
public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value;
|
|
|
|
public bool Read()
|
|
{
|
|
// Parse next JSON object from array
|
|
// Map properties to _currentRow by ordinal
|
|
// Return false at end of array
|
|
}
|
|
}
|
|
```
|
|
|
|
**Key Design Decisions:**
|
|
- Uses `JsonDocument.ParseValue()` to read one object at a time (memory efficient)
|
|
- Properties mapped to schema by name (case-insensitive)
|
|
- Missing properties become `DBNull.Value`
|
|
- Extra JSON properties are ignored
|
|
|
|
### JsonZstdFileSource
|
|
|
|
Implements `IImportSource` for the ETL pipeline:
|
|
|
|
```csharp
|
|
public class JsonZstdFileSource : IImportSource
|
|
{
|
|
private readonly string _filePath;
|
|
private readonly JsonColumnSchema[] _schema;
|
|
private FileStream? _fileStream;
|
|
private DecompressionStream? _decompressionStream;
|
|
|
|
public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}";
|
|
|
|
public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema);
|
|
|
|
public Task<IDataReader> ReadDataAsync(CancellationToken ct = default);
|
|
public ValueTask DisposeAsync();
|
|
}
|
|
```
|
|
|
|
### DevEtlRegistry
|
|
|
|
Central registry for all development ETL pipelines:
|
|
|
|
```csharp
|
|
public class DevEtlRegistry
|
|
{
|
|
private readonly IDbConnectionFactory _factory;
|
|
private readonly string _cacheDirectory;
|
|
|
|
public EtlPipeline GetPipeline(string tableName);
|
|
public IEnumerable<string> GetAvailableTables();
|
|
public async Task<PipelineResult> RunAsync(string tableName, CancellationToken ct);
|
|
public async Task<IReadOnlyList<PipelineResult>> RunAllAsync(CancellationToken ct);
|
|
}
|
|
```
|
|
|
|
### Per-Table ETL Classes
|
|
|
|
Each table has a static class with explicit schema (generated by reading SQL scripts):
|
|
|
|
```csharp
|
|
public static class BranchDevEtl
|
|
{
|
|
public static readonly string TableName = "Branch";
|
|
public static readonly string CacheFileName = "branch.json.zstd";
|
|
|
|
private static readonly JsonColumnSchema[] Schema = new[]
|
|
{
|
|
new JsonColumnSchema("Code", typeof(string)),
|
|
new JsonColumnSchema("Description", typeof(string)),
|
|
new JsonColumnSchema("LastUpdateDT", typeof(DateTime)),
|
|
};
|
|
|
|
public static EtlPipeline Create(IDbConnectionFactory factory, string cacheFilePath)
|
|
{
|
|
return new EtlPipelineBuilder()
|
|
.WithName("Branch_Dev")
|
|
.WithSource(new JsonZstdFileSource(cacheFilePath, Schema))
|
|
.WithDestination(new DbBulkImportDestination(factory, "Branch"))
|
|
.Build();
|
|
}
|
|
}
|
|
```
|
|
|
|
## File Organization
|
|
|
|
```
|
|
NEW/src/JdeScoping.DataSync/
|
|
├── Etl/
|
|
│ ├── Sources/
|
|
│ │ ├── DbQuerySource.cs (existing)
|
|
│ │ ├── JsonZstdFileSource.cs (new)
|
|
│ │ └── JsonStreamingDataReader.cs (new)
|
|
│ └── Models/
|
|
│ └── JsonColumnSchema.cs (new)
|
|
│
|
|
├── DevEtl/
|
|
│ ├── DevEtlRegistry.cs (new)
|
|
│ ├── BranchDevEtl.cs (new)
|
|
│ ├── OrgHierarchyDevEtl.cs (new)
|
|
│ ├── WorkCenterDevEtl.cs (new)
|
|
│ ├── ProfitCenterDevEtl.cs (new)
|
|
│ ├── JdeUserDevEtl.cs (new)
|
|
│ ├── ItemDevEtl.cs (new)
|
|
│ ├── LotDevEtl.cs (new)
|
|
│ ├── FunctionCodeDevEtl.cs (new)
|
|
│ ├── RouteMasterDevEtl.cs (new)
|
|
│ ├── MisDataDevEtl.cs (new)
|
|
│ ├── WorkOrderCurrDevEtl.cs (new)
|
|
│ ├── WorkOrderHistDevEtl.cs (new)
|
|
│ ├── LotUsageCurrDevEtl.cs (new)
|
|
│ ├── LotUsageHistDevEtl.cs (new)
|
|
│ ├── WorkOrderTimeCurrDevEtl.cs (new)
|
|
│ ├── WorkOrderTimeHistDevEtl.cs (new)
|
|
│ ├── WorkOrderStepCurrDevEtl.cs (new)
|
|
│ ├── WorkOrderStepHistDevEtl.cs (new)
|
|
│ ├── WorkOrderComponentCurrDevEtl.cs (new)
|
|
│ ├── WorkOrderComponentHistDevEtl.cs (new)
|
|
│ └── WorkOrderRoutingDevEtl.cs (new)
|
|
```
|
|
|
|
## Dependencies
|
|
|
|
**New NuGet Package:**
|
|
- `ZstdSharp.Port` - Pure C# zstd decompression (no native dependencies)
|
|
|
|
## SQL Type to CLR Type Mapping
|
|
|
|
| SQL Type | CLR Type |
|
|
|----------|----------|
|
|
| `VARCHAR(n)`, `NVARCHAR(n)` | `string` |
|
|
| `INT` | `int` |
|
|
| `BIGINT` | `long` |
|
|
| `DECIMAL(p,s)`, `NUMERIC(p,s)` | `decimal` |
|
|
| `DATETIME`, `DATETIME2(n)` | `DateTime` |
|
|
| `BIT` | `bool` |
|
|
| `VARBINARY(n)` | `byte[]` |
|
|
|
|
## Cache File Inventory
|
|
|
|
| Table | Cache File | Size |
|
|
|-------|------------|------|
|
|
| Branch | branch.json.zstd | 930 B |
|
|
| OrgHierarchy | orghierarchy.json.zstd | 36 KB |
|
|
| WorkCenter | workcenter.json.zstd | 65 KB |
|
|
| ProfitCenter | profitcenter.json.zstd | 148 KB |
|
|
| JdeUser | jdeuser.json.zstd | 2.4 MB |
|
|
| FunctionCode | functioncode.json.zstd | 3.2 MB |
|
|
| Item | item.json.zstd | 17 MB |
|
|
| RouteMaster | routemaster.json.zstd | 20 MB |
|
|
| WorkOrder_Hist | workorder_hist.json.zstd | 41 MB |
|
|
| WorkOrder_Curr | workorder_curr.json.zstd | 86 MB |
|
|
| LotUsage_Hist | lotusage_hist.json.zstd | 146 MB |
|
|
| WorkOrderComponent_Hist | workordercomponent_hist.json.zstd | 148 MB |
|
|
| Lot | lot.json.zstd | 184 MB |
|
|
| MisData | misdata.json.zstd | 178 MB |
|
|
| WorkOrderStep_Hist | workorderstep_hist.json.zstd | 268 MB |
|
|
| WorkOrderComponent_Curr | workordercomponent_curr.json.zstd | 314 MB |
|
|
| WorkOrderRouting | workorderrouting.json.zstd | 324 MB |
|
|
| LotUsage_Curr | lotusage_curr.json.zstd | 400 MB |
|
|
| WorkOrderStep_Curr | workorderstep_curr.json.zstd | 507 MB |
|
|
| WorkOrderTime_Hist | workordertime_hist.json.zstd | 512 MB |
|
|
| WorkOrderTime_Curr | workordertime_curr.json.zstd | 879 MB |
|
|
|
|
**Note:** StatusCode has no cache file.
|
|
|
|
## Memory Considerations
|
|
|
|
The streaming approach ensures:
|
|
- Only one JSON object in memory at a time (~1-10 KB per row)
|
|
- Decompression buffer ~64 KB
|
|
- Suitable for all file sizes including 879 MB workordertime_curr
|
|
|
|
## Testing Strategy
|
|
|
|
1. Unit tests for `JsonStreamingDataReader` with small JSON samples
|
|
2. Integration test loading Branch (smallest) to validate end-to-end
|
|
3. Integration test loading WorkOrderTime_Curr (largest) to validate streaming
|