# Protobuf Cache Conversion Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Convert development cache files from zstd-compressed JSON to zstd-compressed Protocol Buffers for faster deserialization and simpler code. **Architecture:** Create a standalone converter tool to transform existing JSON files to protobuf format. Replace `JsonZstdFileSource` with `ProtobufZstdFileSource` that uses protobuf-net-data's `DataSerializer.Deserialize()` to get an `IDataReader` directly. Remove schema definitions from DevEtl classes since protobuf embeds the schema. **Tech Stack:** protobuf-net-data, ZstdSharp.Port, .NET 10 --- ## Task 1: Create Converter Tool Project **Files:** - Create: `Tools/CacheConverter/CacheConverter.csproj` - Create: `Tools/CacheConverter/Program.cs` **Step 1: Create project directory** ```bash mkdir -p Tools/CacheConverter ``` **Step 2: Create project file** Create `Tools/CacheConverter/CacheConverter.csproj`: ```xml Exe net10.0 enable enable ``` **Step 3: Create converter program** Create `Tools/CacheConverter/Program.cs`: ```csharp using System.Data; using System.Text.Json; using ProtoBuf.Data; using ZstdSharp; if (args.Length == 0) { Console.WriteLine("Usage: CacheConverter "); Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES"); return 1; } var cacheDir = args[0]; if (!Directory.Exists(cacheDir)) { Console.WriteLine($"Error: Directory not found: {cacheDir}"); return 1; } var jsonFiles = Directory.GetFiles(cacheDir, "*.json.zstd"); Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert"); long totalOriginalSize = 0; long totalNewSize = 0; foreach (var jsonFile in jsonFiles) { var baseName = Path.GetFileName(jsonFile).Replace(".json.zstd", ""); var outputFile = Path.Combine(cacheDir, $"{baseName}.pb.zstd"); Console.Write($"Converting {baseName}... "); try { var originalSize = new FileInfo(jsonFile).Length; totalOriginalSize += originalSize; // Read and decompress JSON using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan); using var decompressStream = new DecompressionStream(inputFs); using var bufferedInput = new BufferedStream(decompressStream, 256 * 1024); // Parse JSON array into list of dictionaries var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; var records = JsonSerializer.Deserialize>>(bufferedInput, jsonOptions) ?? throw new InvalidDataException("Failed to parse JSON array"); if (records.Count == 0) { Console.WriteLine("SKIP (empty)"); continue; } // Create DataTable from records var dataTable = CreateDataTable(records); // Write protobuf with zstd compression using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024); using var compressStream = new CompressionStream(outputFs, level: 3); using var reader = dataTable.CreateDataReader(); DataSerializer.Serialize(compressStream, reader); compressStream.Flush(); var newSize = new FileInfo(outputFile).Length; totalNewSize += newSize; var ratio = (double)newSize / originalSize * 100; Console.WriteLine($"OK ({originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)"); } catch (Exception ex) { Console.WriteLine($"ERROR: {ex.Message}"); } } Console.WriteLine(); Console.WriteLine($"Total: {totalOriginalSize:N0} -> {totalNewSize:N0} bytes ({(double)totalNewSize / totalOriginalSize * 100:F1}%)"); return 0; static DataTable CreateDataTable(List> records) { var dt = new DataTable(); var firstRecord = records[0]; // Infer column types from first record foreach (var (key, value) in firstRecord) { var colType = InferType(value); dt.Columns.Add(key, colType); } // Add all rows foreach (var record in records) { var row = dt.NewRow(); foreach (DataColumn col in dt.Columns) { if (record.TryGetValue(col.ColumnName, out var value)) { row[col] = ConvertValue(value, col.DataType); } else { row[col] = DBNull.Value; } } dt.Rows.Add(row); } return dt; } static Type InferType(JsonElement element) => element.ValueKind switch { JsonValueKind.String => typeof(string), JsonValueKind.Number when element.TryGetInt64(out _) => typeof(long), JsonValueKind.Number => typeof(decimal), JsonValueKind.True or JsonValueKind.False => typeof(bool), JsonValueKind.Null => typeof(string), // Default nullable to string _ => typeof(string) }; static object ConvertValue(JsonElement element, Type targetType) { if (element.ValueKind == JsonValueKind.Null) return DBNull.Value; if (targetType == typeof(string)) { var str = element.GetString(); // Try to parse as DateTime if it looks like one if (str != null && DateTime.TryParse(str, out var dt)) return dt; return str ?? DBNull.Value; } if (targetType == typeof(long)) return element.GetInt64(); if (targetType == typeof(decimal)) return element.GetDecimal(); if (targetType == typeof(bool)) return element.GetBoolean(); return element.GetString() ?? DBNull.Value; } ``` **Step 4: Test the converter builds** ```bash cd Tools/CacheConverter && dotnet build ``` Expected: Build succeeded. **Step 5: Commit** ```bash git add Tools/CacheConverter git commit -m "feat: add protobuf cache converter tool" ``` --- ## Task 2: Run Converter on Cache Files **Step 1: Run converter** ```bash cd Tools/CacheConverter dotnet run -- ../../CACHED_DB_FILES ``` Expected: All 22 files convert successfully with size comparison output. **Step 2: Verify output files exist** ```bash ls -la ../../CACHED_DB_FILES/*.pb.zstd | wc -l ``` Expected: 22 files **Step 3: Commit converted files (optional - they may be gitignored)** If the files should be tracked: ```bash git add ../../CACHED_DB_FILES/*.pb.zstd git commit -m "data: convert cache files to protobuf format" ``` --- ## Task 3: Add protobuf-net-data Package to DataSync.Dev **Files:** - Modify: `NEW/src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj` **Step 1: Add package reference** Add to `JdeScoping.DataSync.Dev.csproj` ItemGroup: ```xml ``` **Step 2: Restore and verify** ```bash cd NEW && dotnet restore src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj ``` Expected: Restore succeeded. **Step 3: Commit** ```bash git add NEW/src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj git commit -m "deps: add protobuf-net-data to DataSync.Dev" ``` --- ## Task 4: Create ProtobufZstdFileSource **Files:** - Create: `NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs` **Step 1: Create the source class** Create `NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs`: ```csharp using System.Data; using JdeScoping.DataSync.Etl.Contracts; using ProtoBuf.Data; using ZstdSharp; namespace JdeScoping.DataSync.Dev.Sources; /// /// Import source that reads from a zstd-compressed protobuf file. /// Uses protobuf-net-data for IDataReader deserialization. /// public sealed class ProtobufZstdFileSource : IImportSource { private const int FileBufferSize = 256 * 1024; // 256 KB private const int DecompressBufferSize = 256 * 1024; // 256 KB private readonly string _filePath; private FileStream? _fileStream; private DecompressionStream? _decompressionStream; private BufferedStream? _bufferedStream; private IDataReader? _reader; public string SourceName => $"Protobuf:{Path.GetFileName(_filePath)}"; public ProtobufZstdFileSource(string filePath) { if (string.IsNullOrWhiteSpace(filePath)) throw new ArgumentException("File path cannot be null or empty.", nameof(filePath)); if (!File.Exists(filePath)) throw new FileNotFoundException($"Cache file not found: {filePath}", filePath); _filePath = filePath; } public Task ReadDataAsync(CancellationToken cancellationToken = default) { if (_fileStream != null) throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again."); try { _fileStream = new FileStream( _filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: FileBufferSize, FileOptions.SequentialScan); _decompressionStream = new DecompressionStream(_fileStream); _bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize); // protobuf-net-data returns IDataReader directly! _reader = DataSerializer.Deserialize(_bufferedStream); return Task.FromResult(_reader); } catch { Cleanup(); throw; } } private void Cleanup() { _reader?.Dispose(); _bufferedStream?.Dispose(); _decompressionStream?.Dispose(); _fileStream?.Dispose(); _reader = null; _bufferedStream = null; _decompressionStream = null; _fileStream = null; } public async ValueTask DisposeAsync() { if (_reader != null) { _reader.Dispose(); _reader = null; } if (_bufferedStream != null) { await _bufferedStream.DisposeAsync(); _bufferedStream = null; } if (_decompressionStream != null) { await _decompressionStream.DisposeAsync(); _decompressionStream = null; } if (_fileStream != null) { await _fileStream.DisposeAsync(); _fileStream = null; } } } ``` **Step 2: Verify build** ```bash cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj ``` Expected: Build succeeded. **Step 3: Commit** ```bash git add NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs git commit -m "feat: add ProtobufZstdFileSource for reading protobuf cache files" ``` --- ## Task 5: Update BranchDevEtl (First Migration) **Files:** - Modify: `NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs` **Step 1: Update BranchDevEtl** Replace contents of `NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs`: ```csharp using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; using JdeScoping.DataSync.Etl.Pipeline; using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; /// /// Development ETL pipeline for the Branch table. /// public static class BranchDevEtl { public static readonly string TableName = "Branch"; public static readonly string CacheFileName = "branch.pb.zstd"; public static EtlPipeline Create(IDbConnectionFactory connectionFactory, string cacheFilePath) { ArgumentNullException.ThrowIfNull(connectionFactory); if (string.IsNullOrWhiteSpace(cacheFilePath)) throw new ArgumentException("Cache file path is required.", nameof(cacheFilePath)); return new EtlPipelineBuilder() .WithName($"{TableName}_Dev") .WithSource(new ProtobufZstdFileSource(cacheFilePath)) .WithDestination(new DbBulkImportDestination(connectionFactory, TableName)) .Build(); } } ``` **Step 2: Verify build** ```bash cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj ``` Expected: Build succeeded. **Step 3: Run Branch test (if protobuf files exist)** ```bash cd NEW && dotnet test tests/JdeScoping.DataSync.Dev.Tests --filter "FullyQualifiedName~BranchDevEtl" --verbosity normal ``` Expected: Tests pass (or skip if cache files don't exist yet). **Step 4: Commit** ```bash git add NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs git commit -m "refactor: migrate BranchDevEtl to protobuf source" ``` --- ## Task 6: Update Remaining DevEtl Files (Batch Migration) **Files:** - Modify: All remaining `*DevEtl.cs` files (20 files) For each file, apply the same pattern: 1. Remove `using JdeScoping.DataSync.Dev.Models;` (no longer needed) 2. Remove `private static readonly JsonColumnSchema[] Schema = [...]` 3. Change `CacheFileName` from `.json.zstd` to `.pb.zstd` 4. Change `new JsonZstdFileSource(cacheFilePath, Schema)` to `new ProtobufZstdFileSource(cacheFilePath)` **Step 1: Update all files** Apply the pattern from Task 5 to each file: | File | CacheFileName | |------|---------------| | `FunctionCodeDevEtl.cs` | `functioncode.pb.zstd` | | `ItemDevEtl.cs` | `item.pb.zstd` | | `JdeUserDevEtl.cs` | `jdeuser.pb.zstd` | | `LotDevEtl.cs` | `lot.pb.zstd` | | `LotUsageCurrDevEtl.cs` | `lotusage_curr.pb.zstd` | | `LotUsageHistDevEtl.cs` | `lotusage_hist.pb.zstd` | | `MisDataDevEtl.cs` | `misdata.pb.zstd` | | `OrgHierarchyDevEtl.cs` | `orghierarchy.pb.zstd` | | `ProfitCenterDevEtl.cs` | `profitcenter.pb.zstd` | | `RouteMasterDevEtl.cs` | `routemaster.pb.zstd` | | `WorkCenterDevEtl.cs` | `workcenter.pb.zstd` | | `WorkOrderComponentCurrDevEtl.cs` | `workordercomponent_curr.pb.zstd` | | `WorkOrderComponentHistDevEtl.cs` | `workordercomponent_hist.pb.zstd` | | `WorkOrderCurrDevEtl.cs` | `workorder_curr.pb.zstd` | | `WorkOrderHistDevEtl.cs` | `workorder_hist.pb.zstd` | | `WorkOrderRoutingDevEtl.cs` | `workorderrouting.pb.zstd` | | `WorkOrderStepCurrDevEtl.cs` | `workorderstep_curr.pb.zstd` | | `WorkOrderStepHistDevEtl.cs` | `workorderstep_hist.pb.zstd` | | `WorkOrderTimeCurrDevEtl.cs` | `workordertime_curr.pb.zstd` | | `WorkOrderTimeHistDevEtl.cs` | `workordertime_hist.pb.zstd` | **Step 2: Verify build** ```bash cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj ``` Expected: Build succeeded. **Step 3: Commit** ```bash git add NEW/src/JdeScoping.DataSync.Dev/*DevEtl.cs git commit -m "refactor: migrate all DevEtl files to protobuf source" ``` --- ## Task 7: Delete Obsolete JSON Source Files **Files:** - Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs` - Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs` - Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs` - Delete: `NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs` **Step 1: Delete files** ```bash rm NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs rm NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs rm NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs rm NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs ``` **Step 2: Remove empty Models directory if empty** ```bash rmdir NEW/src/JdeScoping.DataSync.Dev/Models 2>/dev/null || true ``` **Step 3: Verify build** ```bash cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj ``` Expected: Build succeeded. **Step 4: Commit** ```bash git add -A NEW/src/JdeScoping.DataSync.Dev/ git commit -m "chore: remove obsolete JSON source files" ``` --- ## Task 8: Update DevEtlRegistry Comment **Files:** - Modify: `NEW/src/JdeScoping.DataSync.Dev/DevEtlRegistry.cs` **Step 1: Update class comment** Change line 9-10 from: ```csharp /// /// Registry for development ETL pipelines that load from cached JSON files. /// ``` To: ```csharp /// /// Registry for development ETL pipelines that load from cached protobuf files. /// ``` **Step 2: Commit** ```bash git add NEW/src/JdeScoping.DataSync.Dev/DevEtlRegistry.cs git commit -m "docs: update DevEtlRegistry comment for protobuf" ``` --- ## Task 9: Run All Tests **Step 1: Build entire solution** ```bash cd NEW && dotnet build ``` Expected: Build succeeded. **Step 2: Run DataSync.Dev tests** ```bash cd NEW && dotnet test tests/JdeScoping.DataSync.Dev.Tests --verbosity normal ``` Expected: All tests pass (or skip if cache files don't exist). --- ## Task 10: Clean Up Old JSON Cache Files (Manual) **After verifying everything works:** ```bash rm CACHED_DB_FILES/*.json.zstd ``` This step is manual and should only be done after confirming the protobuf files work correctly. --- ## Summary | Task | Description | Commit Message | |------|-------------|----------------| | 1 | Create converter tool | `feat: add protobuf cache converter tool` | | 2 | Run converter | (optional data commit) | | 3 | Add protobuf-net-data package | `deps: add protobuf-net-data to DataSync.Dev` | | 4 | Create ProtobufZstdFileSource | `feat: add ProtobufZstdFileSource for reading protobuf cache files` | | 5 | Update BranchDevEtl | `refactor: migrate BranchDevEtl to protobuf source` | | 6 | Update remaining DevEtl files | `refactor: migrate all DevEtl files to protobuf source` | | 7 | Delete obsolete JSON files | `chore: remove obsolete JSON source files` | | 8 | Update registry comment | `docs: update DevEtlRegistry comment for protobuf` | | 9 | Run all tests | (verification only) | | 10 | Delete old cache files | (manual cleanup) |