d4135e8ad3
The WHERE clause was comparing Code to itself instead of the aliased table reference, which would always be true.
8.9 KiB
8.9 KiB
Development ETL Pipeline Design
Purpose
Create development ETL pipelines that load data from cached .json.zstd files into the local SQL Server database. This enables local development and testing without requiring access to live Oracle/Sybase enterprise sources.
Architecture
┌─────────────────────────────────────────────────────────────┐
│ JsonZstdFileSource │
├─────────────────────────────────────────────────────────────┤
│ File Path (.json.zstd) │
│ │ │
│ ▼ │
│ ZstdSharp DecompressionStream │
│ │ │
│ ▼ │
│ JsonStreamingDataReader : IDataReader │
│ │ │
│ ▼ │
│ ETL Pipeline (transformers → destination) │
└─────────────────────────────────────────────────────────────┘
Execution Flow:
JsonZstdFileSourceopens the.json.zstdfileZstdSharp.DecompressionStreamdecompresses on-the-flyJsonStreamingDataReaderparses JSON array, yielding one row at a time- ETL pipeline applies transformers and writes to SQL Server via bulk copy
Components
JsonColumnSchema
Column metadata record used by the streaming reader:
public record JsonColumnSchema(
string Name,
Type ClrType,
bool IsNullable = true);
JsonStreamingDataReader
Implements IDataReader to stream JSON array without loading into memory:
internal class JsonStreamingDataReader : IDataReader
{
private readonly StreamReader _reader;
private readonly JsonColumnSchema[] _schema;
private readonly Dictionary<string, int> _nameToOrdinal;
private object?[] _currentRow;
public int FieldCount => _schema.Length;
public string GetName(int ordinal) => _schema[ordinal].Name;
public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType;
public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value;
public bool Read()
{
// Parse next JSON object from array
// Map properties to _currentRow by ordinal
// Return false at end of array
}
}
Key Design Decisions:
- Uses
JsonDocument.ParseValue()to read one object at a time (memory efficient) - Properties mapped to schema by name (case-insensitive)
- Missing properties become
DBNull.Value - Extra JSON properties are ignored
JsonZstdFileSource
Implements IImportSource for the ETL pipeline:
public class JsonZstdFileSource : IImportSource
{
private readonly string _filePath;
private readonly JsonColumnSchema[] _schema;
private FileStream? _fileStream;
private DecompressionStream? _decompressionStream;
public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}";
public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema);
public Task<IDataReader> ReadDataAsync(CancellationToken ct = default);
public ValueTask DisposeAsync();
}
DevEtlRegistry
Central registry for all development ETL pipelines:
public class DevEtlRegistry
{
private readonly IDbConnectionFactory _factory;
private readonly string _cacheDirectory;
public EtlPipeline GetPipeline(string tableName);
public IEnumerable<string> GetAvailableTables();
public async Task<PipelineResult> RunAsync(string tableName, CancellationToken ct);
public async Task<IReadOnlyList<PipelineResult>> RunAllAsync(CancellationToken ct);
}
Per-Table ETL Classes
Each table has a static class with explicit schema (generated by reading SQL scripts):
public static class BranchDevEtl
{
public static readonly string TableName = "Branch";
public static readonly string CacheFileName = "branch.json.zstd";
private static readonly JsonColumnSchema[] Schema = new[]
{
new JsonColumnSchema("Code", typeof(string)),
new JsonColumnSchema("Description", typeof(string)),
new JsonColumnSchema("LastUpdateDT", typeof(DateTime)),
};
public static EtlPipeline Create(IDbConnectionFactory factory, string cacheFilePath)
{
return new EtlPipelineBuilder()
.WithName("Branch_Dev")
.WithSource(new JsonZstdFileSource(cacheFilePath, Schema))
.WithDestination(new DbBulkImportDestination(factory, "Branch"))
.Build();
}
}
File Organization
NEW/src/JdeScoping.DataSync/
├── Etl/
│ ├── Sources/
│ │ ├── DbQuerySource.cs (existing)
│ │ ├── JsonZstdFileSource.cs (new)
│ │ └── JsonStreamingDataReader.cs (new)
│ └── Models/
│ └── JsonColumnSchema.cs (new)
│
├── DevEtl/
│ ├── DevEtlRegistry.cs (new)
│ ├── BranchDevEtl.cs (new)
│ ├── OrgHierarchyDevEtl.cs (new)
│ ├── WorkCenterDevEtl.cs (new)
│ ├── ProfitCenterDevEtl.cs (new)
│ ├── JdeUserDevEtl.cs (new)
│ ├── ItemDevEtl.cs (new)
│ ├── LotDevEtl.cs (new)
│ ├── FunctionCodeDevEtl.cs (new)
│ ├── RouteMasterDevEtl.cs (new)
│ ├── MisDataDevEtl.cs (new)
│ ├── WorkOrderCurrDevEtl.cs (new)
│ ├── WorkOrderHistDevEtl.cs (new)
│ ├── LotUsageCurrDevEtl.cs (new)
│ ├── LotUsageHistDevEtl.cs (new)
│ ├── WorkOrderTimeCurrDevEtl.cs (new)
│ ├── WorkOrderTimeHistDevEtl.cs (new)
│ ├── WorkOrderStepCurrDevEtl.cs (new)
│ ├── WorkOrderStepHistDevEtl.cs (new)
│ ├── WorkOrderComponentCurrDevEtl.cs (new)
│ ├── WorkOrderComponentHistDevEtl.cs (new)
│ └── WorkOrderRoutingDevEtl.cs (new)
Dependencies
New NuGet Package:
ZstdSharp.Port- Pure C# zstd decompression (no native dependencies)
SQL Type to CLR Type Mapping
| SQL Type | CLR Type |
|---|---|
VARCHAR(n), NVARCHAR(n) |
string |
INT |
int |
BIGINT |
long |
DECIMAL(p,s), NUMERIC(p,s) |
decimal |
DATETIME, DATETIME2(n) |
DateTime |
BIT |
bool |
VARBINARY(n) |
byte[] |
Cache File Inventory
| Table | Cache File | Size |
|---|---|---|
| Branch | branch.json.zstd | 930 B |
| OrgHierarchy | orghierarchy.json.zstd | 36 KB |
| WorkCenter | workcenter.json.zstd | 65 KB |
| ProfitCenter | profitcenter.json.zstd | 148 KB |
| JdeUser | jdeuser.json.zstd | 2.4 MB |
| FunctionCode | functioncode.json.zstd | 3.2 MB |
| Item | item.json.zstd | 17 MB |
| RouteMaster | routemaster.json.zstd | 20 MB |
| WorkOrder_Hist | workorder_hist.json.zstd | 41 MB |
| WorkOrder_Curr | workorder_curr.json.zstd | 86 MB |
| LotUsage_Hist | lotusage_hist.json.zstd | 146 MB |
| WorkOrderComponent_Hist | workordercomponent_hist.json.zstd | 148 MB |
| Lot | lot.json.zstd | 184 MB |
| MisData | misdata.json.zstd | 178 MB |
| WorkOrderStep_Hist | workorderstep_hist.json.zstd | 268 MB |
| WorkOrderComponent_Curr | workordercomponent_curr.json.zstd | 314 MB |
| WorkOrderRouting | workorderrouting.json.zstd | 324 MB |
| LotUsage_Curr | lotusage_curr.json.zstd | 400 MB |
| WorkOrderStep_Curr | workorderstep_curr.json.zstd | 507 MB |
| WorkOrderTime_Hist | workordertime_hist.json.zstd | 512 MB |
| WorkOrderTime_Curr | workordertime_curr.json.zstd | 879 MB |
Note: StatusCode has no cache file.
Memory Considerations
The streaming approach ensures:
- Only one JSON object in memory at a time (~1-10 KB per row)
- Decompression buffer ~64 KB
- Suitable for all file sizes including 879 MB workordertime_curr
Testing Strategy
- Unit tests for
JsonStreamingDataReaderwith small JSON samples - Integration test loading Branch (smallest) to validate end-to-end
- Integration test loading WorkOrderTime_Curr (largest) to validate streaming