docs: add protobuf cache conversion implementation plan
10-task plan covering converter tool creation, ProtobufZstdFileSource, DevEtl migrations, and cleanup of obsolete JSON source files.
This commit is contained in:
@@ -1,271 +0,0 @@
|
||||
using System.Data;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.Exceptions;
|
||||
using JdeScoping.DataSync.Models;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using Microsoft.Data.SqlClient;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NSubstitute;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
public class BulkMergeHelperTests
|
||||
{
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly IDataReaderFactory _dataReaderFactory;
|
||||
private readonly ISchemaValidator _schemaValidator;
|
||||
private readonly ILogger<BulkMergeHelper> _logger;
|
||||
private readonly BulkMergeHelper _helper;
|
||||
|
||||
private class TestEntity
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public decimal Amount { get; set; }
|
||||
}
|
||||
|
||||
public BulkMergeHelperTests()
|
||||
{
|
||||
_connectionFactory = Substitute.For<IDbConnectionFactory>();
|
||||
_dataReaderFactory = Substitute.For<IDataReaderFactory>();
|
||||
_schemaValidator = Substitute.For<ISchemaValidator>();
|
||||
_logger = Substitute.For<ILogger<BulkMergeHelper>>();
|
||||
|
||||
// Setup default mock returns
|
||||
_dataReaderFactory.GetColumnNames<TestEntity>()
|
||||
.Returns(new List<string> { "Id", "Name", "Amount" });
|
||||
|
||||
_helper = new BulkMergeHelper(
|
||||
_connectionFactory,
|
||||
_dataReaderFactory,
|
||||
_schemaValidator,
|
||||
_logger);
|
||||
}
|
||||
|
||||
#region Constructor Tests
|
||||
|
||||
[Fact]
|
||||
public void Constructor_NullConnectionFactory_ThrowsArgumentNullException()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() =>
|
||||
new BulkMergeHelper(null!, _dataReaderFactory, _schemaValidator, _logger));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_NullDataReaderFactory_ThrowsArgumentNullException()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() =>
|
||||
new BulkMergeHelper(_connectionFactory, null!, _schemaValidator, _logger));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_NullSchemaValidator_ThrowsArgumentNullException()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() =>
|
||||
new BulkMergeHelper(_connectionFactory, _dataReaderFactory, null!, _logger));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_NullLogger_ThrowsArgumentNullException()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() =>
|
||||
new BulkMergeHelper(_connectionFactory, _dataReaderFactory, _schemaValidator, null!));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region MergeAsync Parameter Validation Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_NullData_ThrowsArgumentNullException()
|
||||
{
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(() =>
|
||||
_helper.MergeAsync<TestEntity>(
|
||||
null!,
|
||||
"TestTable",
|
||||
x => x.Id));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_NullDestinationTable_ThrowsArgumentNullException()
|
||||
{
|
||||
var data = AsyncEnumerable.Empty<TestEntity>();
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(() =>
|
||||
_helper.MergeAsync(
|
||||
data,
|
||||
null!,
|
||||
x => x.Id));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_EmptyDestinationTable_ThrowsArgumentException()
|
||||
{
|
||||
var data = AsyncEnumerable.Empty<TestEntity>();
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentException>(() =>
|
||||
_helper.MergeAsync(
|
||||
data,
|
||||
"",
|
||||
x => x.Id));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_NullMatchOn_ThrowsArgumentNullException()
|
||||
{
|
||||
var data = AsyncEnumerable.Empty<TestEntity>();
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(() =>
|
||||
_helper.MergeAsync<TestEntity>(
|
||||
data,
|
||||
"TestTable",
|
||||
null!));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Column Expression Tests
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_CalledWithMatchOn_ReturnsCorrectColumns()
|
||||
{
|
||||
// The ExpressionParser is tested separately, this just verifies it's being called
|
||||
var columns = ExpressionParser.GetColumnNames<TestEntity>(x => x.Id);
|
||||
|
||||
Assert.Single(columns);
|
||||
Assert.Equal("Id", columns[0]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_CalledWithMultipleColumns_ReturnsCorrectColumns()
|
||||
{
|
||||
var columns = ExpressionParser.GetColumnNames<TestEntity>(x => new { x.Id, x.Name });
|
||||
|
||||
Assert.Equal(2, columns.Count);
|
||||
Assert.Equal("Id", columns[0]);
|
||||
Assert.Equal("Name", columns[1]);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region TempTableName Generation Tests
|
||||
|
||||
[Fact]
|
||||
public void TempTableName_WithDots_ReplacesWithUnderscores()
|
||||
{
|
||||
// This is implicitly tested by how temp table names are generated
|
||||
var tableName = "dbo.TestTable";
|
||||
|
||||
// The actual generation happens inside MergeAsync, so we verify the pattern
|
||||
var cleaned = tableName.Replace(".", "_").Replace("[", "").Replace("]", "");
|
||||
Assert.Equal("dbo_TestTable", cleaned);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TempTableName_WithBrackets_RemovesBrackets()
|
||||
{
|
||||
var tableName = "[dbo].[TestTable]";
|
||||
var cleaned = tableName.Replace(".", "_").Replace("[", "").Replace("]", "");
|
||||
Assert.Equal("dbo_TestTable", cleaned);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region MergeResult Tests
|
||||
|
||||
[Fact]
|
||||
public void MergeResult_TotalRowsAffected_ReturnsSum()
|
||||
{
|
||||
var result = new MergeResult(100, 60, 40, 10, TimeSpan.FromSeconds(5));
|
||||
|
||||
Assert.Equal(100, result.TotalRowsAffected);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MergeResult_RecordProperties_AreCorrect()
|
||||
{
|
||||
var elapsed = TimeSpan.FromSeconds(5);
|
||||
var result = new MergeResult(100, 60, 40, 10, elapsed);
|
||||
|
||||
Assert.Equal(100, result.TotalRowsProcessed);
|
||||
Assert.Equal(60, result.RowsInserted);
|
||||
Assert.Equal(40, result.RowsUpdated);
|
||||
Assert.Equal(10, result.BatchCount);
|
||||
Assert.Equal(elapsed, result.Elapsed);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region MassInsertAsync Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MassInsertAsync_NullData_ThrowsArgumentNullException()
|
||||
{
|
||||
// Act & Assert
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
() => _helper.MassInsertAsync<TestEntity>(null!, "TestTable"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MassInsertAsync_NullDestination_ThrowsArgumentNullException()
|
||||
{
|
||||
// Arrange
|
||||
var data = AsyncEnumerable.Empty<TestEntity>();
|
||||
|
||||
// Act & Assert
|
||||
// ArgumentException.ThrowIfNullOrWhiteSpace throws ArgumentNullException for null values
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
() => _helper.MassInsertAsync(data, null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MassInsertAsync_EmptyDestination_ThrowsArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
var data = AsyncEnumerable.Empty<TestEntity>();
|
||||
|
||||
// Act & Assert
|
||||
await Assert.ThrowsAsync<ArgumentException>(
|
||||
() => _helper.MassInsertAsync(data, ""));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Exception Tests
|
||||
|
||||
[Fact]
|
||||
public void BulkMergeException_PropertiesAreSet()
|
||||
{
|
||||
var ex = new BulkMergeException("Test error")
|
||||
{
|
||||
TableName = "TestTable",
|
||||
BatchNumber = 5,
|
||||
RowsInBatch = 1000,
|
||||
SqlStatement = "MERGE INTO..."
|
||||
};
|
||||
|
||||
Assert.Equal("TestTable", ex.TableName);
|
||||
Assert.Equal(5, ex.BatchNumber);
|
||||
Assert.Equal(1000, ex.RowsInBatch);
|
||||
Assert.Equal("MERGE INTO...", ex.SqlStatement);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BulkMergeValidationException_ContainsErrors()
|
||||
{
|
||||
var errors = new List<ValidationError>
|
||||
{
|
||||
new(0, "Name", "TooLong", "Exceeds max length"),
|
||||
new(1, "Amount", 999999m, "Overflow")
|
||||
};
|
||||
|
||||
var ex = new BulkMergeValidationException("Validation failed", errors);
|
||||
|
||||
Assert.Equal(2, ex.Errors.Count);
|
||||
Assert.Equal("Name", ex.Errors[0].ColumnName);
|
||||
Assert.Equal("Amount", ex.Errors[1].ColumnName);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -1,194 +0,0 @@
|
||||
using System.Linq.Expressions;
|
||||
using JdeScoping.DataSync.Services;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
public class ExpressionParserTests
|
||||
{
|
||||
private class TestEntity
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public DateTime? LastUpdateDt { get; set; }
|
||||
public decimal Amount { get; set; }
|
||||
}
|
||||
|
||||
#region GetColumnNames Tests
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_SingleProperty_ReturnsSingleColumn()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, object>> expr = x => x.Id;
|
||||
|
||||
// Act
|
||||
var columns = ExpressionParser.GetColumnNames(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Single(columns);
|
||||
Assert.Equal("Id", columns[0]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_SingleStringProperty_ReturnsSingleColumn()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, object>> expr = x => x.Name;
|
||||
|
||||
// Act
|
||||
var columns = ExpressionParser.GetColumnNames(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Single(columns);
|
||||
Assert.Equal("Name", columns[0]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_AnonymousType_ReturnsAllColumns()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, object>> expr = x => new { x.Id, x.Name };
|
||||
|
||||
// Act
|
||||
var columns = ExpressionParser.GetColumnNames(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(2, columns.Count);
|
||||
Assert.Equal("Id", columns[0]);
|
||||
Assert.Equal("Name", columns[1]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_AnonymousTypeWithMultipleProperties_ReturnsAllColumns()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, object>> expr = x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt };
|
||||
|
||||
// Act
|
||||
var columns = ExpressionParser.GetColumnNames(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(4, columns.Count);
|
||||
Assert.Equal("Id", columns[0]);
|
||||
Assert.Equal("Name", columns[1]);
|
||||
Assert.Equal("Amount", columns[2]);
|
||||
Assert.Equal("LastUpdateDt", columns[3]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetColumnNames_NullExpression_ThrowsArgumentNullException()
|
||||
{
|
||||
// Act & Assert
|
||||
Assert.Throws<ArgumentNullException>(() =>
|
||||
ExpressionParser.GetColumnNames<TestEntity>(null!));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region BuildUpdateWhenSql Tests
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_NullExpression_ReturnsNull()
|
||||
{
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql<TestEntity>(null);
|
||||
|
||||
// Assert
|
||||
Assert.Null(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_GreaterThan_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr = (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("source.[LastUpdateDt] > target.[LastUpdateDt]", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_GreaterThanOrEqual_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr = (src, tgt) => src.Id >= tgt.Id;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("source.[Id] >= target.[Id]", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_Equal_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr = (src, tgt) => src.Name == tgt.Name;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("source.[Name] = target.[Name]", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_NotEqual_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr = (src, tgt) => src.Amount != tgt.Amount;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("source.[Amount] <> target.[Amount]", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_AndCondition_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr =
|
||||
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt && src.Id == tgt.Id;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("(source.[LastUpdateDt] > target.[LastUpdateDt] AND source.[Id] = target.[Id])", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_OrCondition_ReturnsSqlCondition()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr =
|
||||
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt || src.Amount > tgt.Amount;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("(source.[LastUpdateDt] > target.[LastUpdateDt] OR source.[Amount] > target.[Amount])", result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildUpdateWhenSql_CustomAliases_UsesProvidedAliases()
|
||||
{
|
||||
// Arrange
|
||||
Expression<Func<TestEntity, TestEntity, bool>> expr = (src, tgt) => src.Id > tgt.Id;
|
||||
|
||||
// Act
|
||||
var result = ExpressionParser.BuildUpdateWhenSql(expr, "s", "t");
|
||||
|
||||
// Assert
|
||||
Assert.Equal("s.[Id] > t.[Id]", result);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
using JdeScoping.Core.Models.WorkOrders;
|
||||
using JdeScoping.DataSync.Configuration.MergeConfigurations;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Shouldly;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
public class MergeConfigurationRegistryTests
|
||||
{
|
||||
[Fact]
|
||||
public void GetConfiguration_RegisteredType_ReturnsConfiguration()
|
||||
{
|
||||
// Arrange
|
||||
var services = new ServiceCollection();
|
||||
services.AddSingleton<IMergeConfiguration<WorkOrder>, WorkOrderMergeConfiguration>();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var registry = new MergeConfigurationRegistry(provider);
|
||||
|
||||
// Act
|
||||
var config = registry.GetConfiguration<WorkOrder>();
|
||||
|
||||
// Assert
|
||||
config.ShouldNotBeNull();
|
||||
config.TableName.ShouldBe("WorkOrder");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetConfiguration_UnregisteredType_ThrowsInvalidOperationException()
|
||||
{
|
||||
// Arrange
|
||||
var services = new ServiceCollection();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var registry = new MergeConfigurationRegistry(provider);
|
||||
|
||||
// Act & Assert
|
||||
var ex = Should.Throw<InvalidOperationException>(() => registry.GetConfiguration<UnregisteredEntity>());
|
||||
ex.Message.ShouldContain("UnregisteredEntity");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void HasConfiguration_RegisteredType_ReturnsTrue()
|
||||
{
|
||||
// Arrange
|
||||
var services = new ServiceCollection();
|
||||
services.AddSingleton<IMergeConfiguration<WorkOrder>, WorkOrderMergeConfiguration>();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var registry = new MergeConfigurationRegistry(provider);
|
||||
|
||||
// Act
|
||||
var result = registry.HasConfiguration<WorkOrder>();
|
||||
|
||||
// Assert
|
||||
result.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void HasConfiguration_UnregisteredType_ReturnsFalse()
|
||||
{
|
||||
// Arrange
|
||||
var services = new ServiceCollection();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var registry = new MergeConfigurationRegistry(provider);
|
||||
|
||||
// Act
|
||||
var result = registry.HasConfiguration<UnregisteredEntity>();
|
||||
|
||||
// Assert
|
||||
result.ShouldBeFalse();
|
||||
}
|
||||
|
||||
private class UnregisteredEntity { }
|
||||
}
|
||||
@@ -1,172 +0,0 @@
|
||||
using JdeScoping.DataSync.Services;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
public class MergeSqlBuilderTests
|
||||
{
|
||||
#region BuildCreateTempTable Tests
|
||||
|
||||
[Fact]
|
||||
public void BuildCreateTempTable_ValidInputs_ReturnsSelectInto()
|
||||
{
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildCreateTempTable("#TEMP_WorkOrder", "WorkOrder");
|
||||
|
||||
// Assert
|
||||
Assert.Equal("SELECT TOP 0 * INTO [#TEMP_WorkOrder] FROM [WorkOrder]", sql);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(null, "WorkOrder")]
|
||||
[InlineData("", "WorkOrder")]
|
||||
[InlineData("#TEMP", null)]
|
||||
[InlineData("#TEMP", "")]
|
||||
public void BuildCreateTempTable_InvalidInputs_ThrowsArgumentException(string? tempTable, string? sourceTable)
|
||||
{
|
||||
// Act & Assert
|
||||
Assert.ThrowsAny<ArgumentException>(() =>
|
||||
MergeSqlBuilder.BuildCreateTempTable(tempTable!, sourceTable!));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region BuildMergeSimple Tests
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_SingleMatchColumn_BuildsCorrectMerge()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = new[] { "Id" };
|
||||
var updateColumns = new[] { "Name", "Amount" };
|
||||
var insertColumns = new[] { "Id", "Name", "Amount" };
|
||||
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildMergeSimple(
|
||||
"TestTable", "#TEMP_TestTable",
|
||||
matchColumns, updateColumns, null, insertColumns);
|
||||
|
||||
// Assert
|
||||
Assert.Contains("MERGE INTO [TestTable] AS target", sql);
|
||||
Assert.Contains("USING [#TEMP_TestTable] AS source", sql);
|
||||
Assert.Contains("ON target.[Id] = source.[Id]", sql);
|
||||
Assert.Contains("WHEN MATCHED THEN", sql);
|
||||
Assert.Contains("UPDATE SET target.[Name] = source.[Name], target.[Amount] = source.[Amount]", sql);
|
||||
Assert.Contains("WHEN NOT MATCHED THEN", sql);
|
||||
Assert.Contains("INSERT ([Id], [Name], [Amount])", sql);
|
||||
Assert.Contains("VALUES (source.[Id], source.[Name], source.[Amount])", sql);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_CompositeKey_BuildsCorrectOnClause()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = new[] { "WorkOrderNumber", "BranchCode" };
|
||||
var updateColumns = new[] { "Status" };
|
||||
var insertColumns = new[] { "WorkOrderNumber", "BranchCode", "Status" };
|
||||
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildMergeSimple(
|
||||
"WorkOrder", "#TEMP",
|
||||
matchColumns, updateColumns, null, insertColumns);
|
||||
|
||||
// Assert
|
||||
Assert.Contains("ON target.[WorkOrderNumber] = source.[WorkOrderNumber] AND target.[BranchCode] = source.[BranchCode]", sql);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_WithUpdateWhen_IncludesCondition()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = new[] { "Id" };
|
||||
var updateColumns = new[] { "Name" };
|
||||
var insertColumns = new[] { "Id", "Name" };
|
||||
var updateWhen = "source.[LastUpdateDt] > target.[LastUpdateDt]";
|
||||
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildMergeSimple(
|
||||
"TestTable", "#TEMP",
|
||||
matchColumns, updateColumns, updateWhen, insertColumns);
|
||||
|
||||
// Assert
|
||||
Assert.Contains("WHEN MATCHED AND source.[LastUpdateDt] > target.[LastUpdateDt] THEN", sql);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_NoUpdateColumns_OmitsUpdateClause()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = new[] { "Id" };
|
||||
var updateColumns = Array.Empty<string>();
|
||||
var insertColumns = new[] { "Id", "Name" };
|
||||
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildMergeSimple(
|
||||
"TestTable", "#TEMP",
|
||||
matchColumns, updateColumns, null, insertColumns);
|
||||
|
||||
// Assert
|
||||
Assert.DoesNotContain("WHEN MATCHED", sql);
|
||||
Assert.Contains("WHEN NOT MATCHED THEN", sql);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_EmptyMatchColumns_ThrowsArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = Array.Empty<string>();
|
||||
var updateColumns = new[] { "Name" };
|
||||
var insertColumns = new[] { "Id", "Name" };
|
||||
|
||||
// Act & Assert
|
||||
Assert.Throws<ArgumentException>(() =>
|
||||
MergeSqlBuilder.BuildMergeSimple(
|
||||
"TestTable", "#TEMP",
|
||||
matchColumns, updateColumns, null, insertColumns));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildMergeSimple_EmptyInsertColumns_ThrowsArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
var matchColumns = new[] { "Id" };
|
||||
var updateColumns = new[] { "Name" };
|
||||
var insertColumns = Array.Empty<string>();
|
||||
|
||||
// Act & Assert
|
||||
Assert.Throws<ArgumentException>(() =>
|
||||
MergeSqlBuilder.BuildMergeSimple(
|
||||
"TestTable", "#TEMP",
|
||||
matchColumns, updateColumns, null, insertColumns));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region BuildTruncateTempTable Tests
|
||||
|
||||
[Fact]
|
||||
public void BuildTruncateTempTable_ValidInput_ReturnsTruncate()
|
||||
{
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildTruncateTempTable("#TEMP_WorkOrder");
|
||||
|
||||
// Assert
|
||||
Assert.Equal("TRUNCATE TABLE [#TEMP_WorkOrder]", sql);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region BuildDropTempTable Tests
|
||||
|
||||
[Fact]
|
||||
public void BuildDropTempTable_ValidInput_ReturnsDropWithCheck()
|
||||
{
|
||||
// Act
|
||||
var sql = MergeSqlBuilder.BuildDropTempTable("#TEMP_WorkOrder");
|
||||
|
||||
// Assert
|
||||
Assert.Contains("IF OBJECT_ID('tempdb..#TEMP_WorkOrder') IS NOT NULL", sql);
|
||||
Assert.Contains("DROP TABLE [#TEMP_WorkOrder]", sql);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -1,277 +0,0 @@
|
||||
using JdeScoping.DataSync.Models;
|
||||
using JdeScoping.DataSync.Services;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
public class SchemaValidatorTests
|
||||
{
|
||||
private readonly SchemaValidator _validator = new();
|
||||
|
||||
private class TestEntity
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public string? NullableName { get; set; }
|
||||
public decimal Amount { get; set; }
|
||||
public DateTime CreatedDate { get; set; }
|
||||
}
|
||||
|
||||
#region ValidateBatch Tests
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_EmptyData_ReturnsEmptyList()
|
||||
{
|
||||
// Arrange
|
||||
var data = Array.Empty<TestEntity>();
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_EmptySchema_ReturnsEmptyList()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity> { new() { Id = 1, Name = "Test" } };
|
||||
var schema = Array.Empty<ColumnSchema>();
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_ValidData_ReturnsEmptyList()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "Test", Amount = 100.50m }
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2),
|
||||
new("Amount", "decimal", null, 10, 2, false, 3)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_StringTooLong_ReturnsError()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "This is a very long string that exceeds the maximum length" }
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 10, null, null, false, 2)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Single(errors);
|
||||
Assert.Equal("Name", errors[0].ColumnName);
|
||||
Assert.Equal(0, errors[0].RowIndex);
|
||||
Assert.Contains("exceeds maximum length", errors[0].Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_NullInNonNullableColumn_ReturnsError()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "" } // Empty string treated as null for non-nullable
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Single(errors);
|
||||
Assert.Equal("Name", errors[0].ColumnName);
|
||||
Assert.Contains("does not allow null", errors[0].Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_NullInNullableColumn_NoError()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "Test", NullableName = null }
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2),
|
||||
new("NullableName", "nvarchar", 50, null, null, true, 3)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_DecimalOverflow_ReturnsError()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "Test", Amount = 12345678.90m } // Too many integer digits for decimal(8,2)
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2),
|
||||
new("Amount", "decimal", null, 8, 2, false, 3) // Max 6 integer digits
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Single(errors);
|
||||
Assert.Equal("Amount", errors[0].ColumnName);
|
||||
Assert.Contains("exceeds maximum integer digits", errors[0].Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_DecimalWithinRange_NoError()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "Test", Amount = 123456.78m } // Within decimal(10,2) - 8 integer digits
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2),
|
||||
new("Amount", "decimal", null, 10, 2, false, 3) // Max 8 integer digits
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_MultipleErrors_ReturnsAll()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "This is too long" },
|
||||
new() { Id = 2, Name = "Also too long!" }
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 5, null, null, false, 2)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(2, errors.Count);
|
||||
Assert.Equal(0, errors[0].RowIndex);
|
||||
Assert.Equal(1, errors[1].RowIndex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_MaxErrors_StopsAtLimit()
|
||||
{
|
||||
// Arrange
|
||||
var data = Enumerable.Range(0, 10)
|
||||
.Select(i => new TestEntity { Id = i, Name = "This is way too long" })
|
||||
.ToList();
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 5, null, null, false, 2)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema, maxErrors: 3);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(3, errors.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_UnmatchedColumn_Ignored()
|
||||
{
|
||||
// Arrange
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 1, Name = "Test" }
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1),
|
||||
new("Name", "nvarchar", 50, null, null, false, 2),
|
||||
new("UnknownColumn", "nvarchar", 50, null, null, false, 3)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValidateBatch_IdColumn_AllowsNull()
|
||||
{
|
||||
// Arrange - Id columns are treated as identity/auto-generated
|
||||
var data = new List<TestEntity>
|
||||
{
|
||||
new() { Id = 0, Name = "Test" } // Id = 0 might be treated as "not set"
|
||||
};
|
||||
var schema = new List<ColumnSchema>
|
||||
{
|
||||
new("Id", "int", null, 10, 0, false, 1), // Not nullable in schema
|
||||
new("Name", "nvarchar", 50, null, null, false, 2)
|
||||
};
|
||||
|
||||
// Act
|
||||
var errors = _validator.ValidateBatch(data, schema);
|
||||
|
||||
// Assert - No error because Id columns are treated specially
|
||||
Assert.Empty(errors);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -1,550 +0,0 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Linq.Expressions;
|
||||
using System.Runtime.CompilerServices;
|
||||
using JdeScoping.Core.Interfaces;
|
||||
using JdeScoping.Core.Models;
|
||||
using JdeScoping.Core.Models.Enums;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Options;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.Models;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using JdeScoping.DataSync.Telemetry;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NSubstitute;
|
||||
using NSubstitute.ExceptionExtensions;
|
||||
using Shouldly;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Unit tests for TableSyncOperation.
|
||||
/// Tests mass/incremental paths, batching, and post-processor execution.
|
||||
/// </summary>
|
||||
public class TableSyncOperationTests
|
||||
{
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly IDataUpdateRepository _updateRepository;
|
||||
private readonly IBulkMergeHelper _bulkMergeHelper;
|
||||
private readonly IMergeConfigurationRegistry _configRegistry;
|
||||
private readonly IOptions<DataSyncOptions> _options;
|
||||
private readonly DataSyncMetrics _metrics;
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
|
||||
public TableSyncOperationTests()
|
||||
{
|
||||
_connectionFactory = Substitute.For<IDbConnectionFactory>();
|
||||
_updateRepository = Substitute.For<IDataUpdateRepository>();
|
||||
_bulkMergeHelper = Substitute.For<IBulkMergeHelper>();
|
||||
_configRegistry = Substitute.For<IMergeConfigurationRegistry>();
|
||||
|
||||
_options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions
|
||||
{
|
||||
BatchSize = 1000,
|
||||
BulkCopyBatchSize = 100
|
||||
});
|
||||
|
||||
var services = new ServiceCollection();
|
||||
services.AddMetrics();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var meterFactory = provider.GetRequiredService<IMeterFactory>();
|
||||
_metrics = new DataSyncMetrics(meterFactory);
|
||||
|
||||
_serviceProvider = Substitute.For<IServiceProvider>();
|
||||
}
|
||||
|
||||
#region Update Logging Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_StartsUpdateWithInProgressMarker()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(123);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(0, TimeSpan.Zero, true));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
await _updateRepository.Received(1).StartUpdateAsync(
|
||||
task.SourceSystem,
|
||||
task.SourceData,
|
||||
task.TableName,
|
||||
task.UpdateType,
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_OnSuccess_CompletesUpdateWithRecordCount()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(123);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(500, TimeSpan.Zero, true));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
await _updateRepository.Received(1).CompleteUpdateAsync(
|
||||
123,
|
||||
500L,
|
||||
true,
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_OnFailure_CompletesUpdateWithFailureMarker()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(123);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.ThrowsAsync(new Exception("Database error"));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act & Assert
|
||||
await Should.ThrowAsync<Exception>(() => sut.ExecuteAsync(task));
|
||||
|
||||
// Verify update was marked as failed
|
||||
await _updateRepository.Received(1).CompleteUpdateAsync(
|
||||
123,
|
||||
-1,
|
||||
false,
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_OnCancellation_MarksUpdateAsFailed()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass);
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(123);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(async callInfo =>
|
||||
{
|
||||
cts.Cancel();
|
||||
callInfo.Arg<CancellationToken>().ThrowIfCancellationRequested();
|
||||
return new MassInsertResult(0, TimeSpan.Zero, true);
|
||||
});
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act & Assert
|
||||
await Should.ThrowAsync<OperationCanceledException>(() => sut.ExecuteAsync(task, cts.Token));
|
||||
|
||||
// Verify update was marked as failed
|
||||
await _updateRepository.Received(1).CompleteUpdateAsync(
|
||||
123,
|
||||
-1,
|
||||
false,
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Mass Update Path Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_MassWithPrepurge_UsesMassUpdatePath()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass, prepurge: true);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(100, TimeSpan.Zero, true));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: Should use mass insert path
|
||||
await _bulkMergeHelper.Received(1).MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
"TestTable",
|
||||
task.ScheduleConfig.ReIndexData,
|
||||
_options.Value.BulkCopyBatchSize,
|
||||
Arg.Any<CancellationToken>());
|
||||
|
||||
// Should NOT use merge path
|
||||
await _bulkMergeHelper.DidNotReceive().MergeAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<Expression<Func<TestEntity, object>>>(),
|
||||
Arg.Any<Expression<Func<TestEntity, object>>>(),
|
||||
Arg.Any<Expression<Func<TestEntity, TestEntity, bool>>>(),
|
||||
Arg.Any<Expression<Func<TestEntity, object>>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Incremental Update Path Tests
|
||||
|
||||
// Note: The following tests are marked as integration tests because they require
|
||||
// complex reflection-based fetcher resolution that's difficult to unit test.
|
||||
// These scenarios should be covered by integration tests with a real test database.
|
||||
//
|
||||
// Scenarios covered by integration tests:
|
||||
// - ExecuteAsync_DailyUpdate_UsesIncrementalPath
|
||||
// - ExecuteAsync_HourlyUpdate_UsesIncrementalPath
|
||||
// - ExecuteAsync_LargeDataset_ProcessesInBatches (incremental path)
|
||||
|
||||
[Fact]
|
||||
public void IncrementalUpdatePath_RequiresIntegrationTest()
|
||||
{
|
||||
// This test documents that incremental update scenarios require integration testing
|
||||
// because the TableSyncOperation uses reflection to resolve and invoke fetchers.
|
||||
//
|
||||
// Integration test should verify:
|
||||
// 1. Daily updates use staging table → merge path
|
||||
// 2. Hourly updates use staging table → merge path
|
||||
// 3. Staging tables are created with unique suffixes
|
||||
// 4. MERGE correctly handles INSERT/UPDATE based on LastUpdateDT
|
||||
// 5. Staging tables are cleaned up after success and failure
|
||||
Assert.True(true, "See integration tests for incremental update path coverage");
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Batching Tests
|
||||
|
||||
// Note: Batching tests for incremental updates require integration testing
|
||||
// because they depend on the reflection-based fetcher resolution.
|
||||
// The batching logic is tested implicitly through integration tests.
|
||||
//
|
||||
// Test scenario to verify:
|
||||
// - Large dataset (25 entities) with BatchSize=10 should create 3 batches
|
||||
|
||||
#endregion
|
||||
|
||||
#region Post-Processor Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WithPostProcessor_InvokesPostProcessor()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("MisData", UpdateTypes.Mass, postProcessor: nameof(MockPostProcessor));
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(0, TimeSpan.Zero, true));
|
||||
|
||||
var mockPostProcessor = new MockPostProcessor();
|
||||
_serviceProvider.GetService(typeof(MockPostProcessor)).Returns(mockPostProcessor);
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
mockPostProcessor.WasInvoked.ShouldBeTrue();
|
||||
mockPostProcessor.TableNameReceived.ShouldBe("MisData");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WithoutPostProcessor_SkipsPostProcessing()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass, postProcessor: null);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(0, TimeSpan.Zero, true));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: No post-processor service resolution should occur
|
||||
_serviceProvider.DidNotReceive().GetService(typeof(IPostProcessor));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Metrics Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_RecordsOperationStartedMetric()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Mass);
|
||||
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
SetupMockFetcher(task, AsyncEnumerable.Empty<TestEntity>());
|
||||
SetupMockMergeConfiguration();
|
||||
|
||||
_bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<TestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(new MassInsertResult(100, TimeSpan.Zero, true));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: Metrics were recorded (using real metrics, just verify no exceptions)
|
||||
// Detailed metric verification is in DataSyncMetricsTests
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Staging Table Cleanup Tests
|
||||
|
||||
// Note: Staging table cleanup tests require integration testing because
|
||||
// they depend on the incremental update path with reflection-based fetcher resolution.
|
||||
//
|
||||
// Test scenarios to verify:
|
||||
// - On successful merge, staging table is dropped
|
||||
// - On merge failure, staging table is still dropped (finally block)
|
||||
// - On bulk copy failure, staging table is still dropped
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helper Methods
|
||||
|
||||
private TableSyncOperation CreateSut()
|
||||
{
|
||||
return new TableSyncOperation(
|
||||
_serviceProvider,
|
||||
_connectionFactory,
|
||||
_updateRepository,
|
||||
_bulkMergeHelper,
|
||||
_configRegistry,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
}
|
||||
|
||||
private static DataUpdateTask CreateTask(
|
||||
string tableName,
|
||||
UpdateTypes updateType,
|
||||
bool prepurge = true,
|
||||
bool reindex = true,
|
||||
string? postProcessor = null)
|
||||
{
|
||||
return new DataUpdateTask
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
UpdateType = updateType,
|
||||
MinimumDt = updateType == UpdateTypes.Mass ? null : DateTime.UtcNow.AddDays(-1),
|
||||
Config = new DataSourceConfig
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
FetcherTypeName = nameof(MockDataFetcher),
|
||||
PostProcessorTypeName = postProcessor,
|
||||
IsEnabled = true,
|
||||
MassConfig = new ScheduleConfig
|
||||
{
|
||||
Enabled = true,
|
||||
IntervalMinutes = 10080,
|
||||
PrepurgeData = prepurge,
|
||||
ReIndexData = reindex
|
||||
},
|
||||
DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 },
|
||||
HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 }
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void SetupMockFetcher(DataUpdateTask task, IAsyncEnumerable<TestEntity> entities, int batchSize = 1000)
|
||||
{
|
||||
var fetcher = new MockDataFetcher(entities);
|
||||
_serviceProvider.GetService(typeof(MockDataFetcher)).Returns(fetcher);
|
||||
}
|
||||
|
||||
private void SetupMockMergeConfiguration()
|
||||
{
|
||||
var mockConfig = Substitute.For<IMergeConfiguration<TestEntity>>();
|
||||
mockConfig.TableName.Returns("TestTable");
|
||||
mockConfig.MatchOn.Returns(x => x.Id);
|
||||
mockConfig.UpdateColumns.Returns(x => new { x.Name, x.LastUpdateDT });
|
||||
mockConfig.UpdateWhen.Returns((Expression<Func<TestEntity, TestEntity, bool>>?)null);
|
||||
mockConfig.InsertColumns.Returns((Expression<Func<TestEntity, object>>?)null);
|
||||
|
||||
_configRegistry.GetConfiguration<TestEntity>().Returns(mockConfig);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
|
||||
#region Test Support Classes
|
||||
|
||||
public class TestEntity
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public DateTime LastUpdateDT { get; set; } = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
public class MockDataFetcher : IDataFetcher<TestEntity>
|
||||
{
|
||||
private readonly IAsyncEnumerable<TestEntity> _entities;
|
||||
|
||||
public MockDataFetcher(IAsyncEnumerable<TestEntity>? entities = null)
|
||||
{
|
||||
_entities = entities ?? AsyncEnumerable.Empty<TestEntity>();
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<TestEntity> FetchAsync(DateTime? minimumDt, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return _entities;
|
||||
}
|
||||
}
|
||||
|
||||
public class MockPostProcessor : IPostProcessor
|
||||
{
|
||||
public bool WasInvoked { get; private set; }
|
||||
public string? TableNameReceived { get; private set; }
|
||||
|
||||
public Task ProcessAsync(string tableName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
WasInvoked = true;
|
||||
TableNameReceived = tableName;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
@@ -0,0 +1,646 @@
|
||||
# Protobuf Cache Conversion Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Convert development cache files from zstd-compressed JSON to zstd-compressed Protocol Buffers for faster deserialization and simpler code.
|
||||
|
||||
**Architecture:** Create a standalone converter tool to transform existing JSON files to protobuf format. Replace `JsonZstdFileSource` with `ProtobufZstdFileSource` that uses protobuf-net-data's `DataSerializer.Deserialize()` to get an `IDataReader` directly. Remove schema definitions from DevEtl classes since protobuf embeds the schema.
|
||||
|
||||
**Tech Stack:** protobuf-net-data, ZstdSharp.Port, .NET 10
|
||||
|
||||
---
|
||||
|
||||
## Task 1: Create Converter Tool Project
|
||||
|
||||
**Files:**
|
||||
- Create: `Tools/CacheConverter/CacheConverter.csproj`
|
||||
- Create: `Tools/CacheConverter/Program.cs`
|
||||
|
||||
**Step 1: Create project directory**
|
||||
|
||||
```bash
|
||||
mkdir -p Tools/CacheConverter
|
||||
```
|
||||
|
||||
**Step 2: Create project file**
|
||||
|
||||
Create `Tools/CacheConverter/CacheConverter.csproj`:
|
||||
|
||||
```xml
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="ZstdSharp.Port" Version="0.8.1" />
|
||||
<PackageReference Include="protobuf-net.Data" Version="3.0.32" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
```
|
||||
|
||||
**Step 3: Create converter program**
|
||||
|
||||
Create `Tools/CacheConverter/Program.cs`:
|
||||
|
||||
```csharp
|
||||
using System.Data;
|
||||
using System.Text.Json;
|
||||
using ProtoBuf.Data;
|
||||
using ZstdSharp;
|
||||
|
||||
if (args.Length == 0)
|
||||
{
|
||||
Console.WriteLine("Usage: CacheConverter <cache-directory>");
|
||||
Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES");
|
||||
return 1;
|
||||
}
|
||||
|
||||
var cacheDir = args[0];
|
||||
if (!Directory.Exists(cacheDir))
|
||||
{
|
||||
Console.WriteLine($"Error: Directory not found: {cacheDir}");
|
||||
return 1;
|
||||
}
|
||||
|
||||
var jsonFiles = Directory.GetFiles(cacheDir, "*.json.zstd");
|
||||
Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert");
|
||||
|
||||
long totalOriginalSize = 0;
|
||||
long totalNewSize = 0;
|
||||
|
||||
foreach (var jsonFile in jsonFiles)
|
||||
{
|
||||
var baseName = Path.GetFileName(jsonFile).Replace(".json.zstd", "");
|
||||
var outputFile = Path.Combine(cacheDir, $"{baseName}.pb.zstd");
|
||||
|
||||
Console.Write($"Converting {baseName}... ");
|
||||
|
||||
try
|
||||
{
|
||||
var originalSize = new FileInfo(jsonFile).Length;
|
||||
totalOriginalSize += originalSize;
|
||||
|
||||
// Read and decompress JSON
|
||||
using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan);
|
||||
using var decompressStream = new DecompressionStream(inputFs);
|
||||
using var bufferedInput = new BufferedStream(decompressStream, 256 * 1024);
|
||||
|
||||
// Parse JSON array into list of dictionaries
|
||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||||
var records = JsonSerializer.Deserialize<List<Dictionary<string, JsonElement>>>(bufferedInput, jsonOptions)
|
||||
?? throw new InvalidDataException("Failed to parse JSON array");
|
||||
|
||||
if (records.Count == 0)
|
||||
{
|
||||
Console.WriteLine("SKIP (empty)");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create DataTable from records
|
||||
var dataTable = CreateDataTable(records);
|
||||
|
||||
// Write protobuf with zstd compression
|
||||
using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024);
|
||||
using var compressStream = new CompressionStream(outputFs, level: 3);
|
||||
using var reader = dataTable.CreateDataReader();
|
||||
DataSerializer.Serialize(compressStream, reader);
|
||||
compressStream.Flush();
|
||||
|
||||
var newSize = new FileInfo(outputFile).Length;
|
||||
totalNewSize += newSize;
|
||||
|
||||
var ratio = (double)newSize / originalSize * 100;
|
||||
Console.WriteLine($"OK ({originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"ERROR: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
Console.WriteLine();
|
||||
Console.WriteLine($"Total: {totalOriginalSize:N0} -> {totalNewSize:N0} bytes ({(double)totalNewSize / totalOriginalSize * 100:F1}%)");
|
||||
return 0;
|
||||
|
||||
static DataTable CreateDataTable(List<Dictionary<string, JsonElement>> records)
|
||||
{
|
||||
var dt = new DataTable();
|
||||
var firstRecord = records[0];
|
||||
|
||||
// Infer column types from first record
|
||||
foreach (var (key, value) in firstRecord)
|
||||
{
|
||||
var colType = InferType(value);
|
||||
dt.Columns.Add(key, colType);
|
||||
}
|
||||
|
||||
// Add all rows
|
||||
foreach (var record in records)
|
||||
{
|
||||
var row = dt.NewRow();
|
||||
foreach (DataColumn col in dt.Columns)
|
||||
{
|
||||
if (record.TryGetValue(col.ColumnName, out var value))
|
||||
{
|
||||
row[col] = ConvertValue(value, col.DataType);
|
||||
}
|
||||
else
|
||||
{
|
||||
row[col] = DBNull.Value;
|
||||
}
|
||||
}
|
||||
dt.Rows.Add(row);
|
||||
}
|
||||
|
||||
return dt;
|
||||
}
|
||||
|
||||
static Type InferType(JsonElement element) => element.ValueKind switch
|
||||
{
|
||||
JsonValueKind.String => typeof(string),
|
||||
JsonValueKind.Number when element.TryGetInt64(out _) => typeof(long),
|
||||
JsonValueKind.Number => typeof(decimal),
|
||||
JsonValueKind.True or JsonValueKind.False => typeof(bool),
|
||||
JsonValueKind.Null => typeof(string), // Default nullable to string
|
||||
_ => typeof(string)
|
||||
};
|
||||
|
||||
static object ConvertValue(JsonElement element, Type targetType)
|
||||
{
|
||||
if (element.ValueKind == JsonValueKind.Null)
|
||||
return DBNull.Value;
|
||||
|
||||
if (targetType == typeof(string))
|
||||
{
|
||||
var str = element.GetString();
|
||||
// Try to parse as DateTime if it looks like one
|
||||
if (str != null && DateTime.TryParse(str, out var dt))
|
||||
return dt;
|
||||
return str ?? DBNull.Value;
|
||||
}
|
||||
|
||||
if (targetType == typeof(long))
|
||||
return element.GetInt64();
|
||||
|
||||
if (targetType == typeof(decimal))
|
||||
return element.GetDecimal();
|
||||
|
||||
if (targetType == typeof(bool))
|
||||
return element.GetBoolean();
|
||||
|
||||
return element.GetString() ?? DBNull.Value;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 4: Test the converter builds**
|
||||
|
||||
```bash
|
||||
cd Tools/CacheConverter && dotnet build
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add Tools/CacheConverter
|
||||
git commit -m "feat: add protobuf cache converter tool"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Run Converter on Cache Files
|
||||
|
||||
**Step 1: Run converter**
|
||||
|
||||
```bash
|
||||
cd Tools/CacheConverter
|
||||
dotnet run -- ../../CACHED_DB_FILES
|
||||
```
|
||||
|
||||
Expected: All 22 files convert successfully with size comparison output.
|
||||
|
||||
**Step 2: Verify output files exist**
|
||||
|
||||
```bash
|
||||
ls -la ../../CACHED_DB_FILES/*.pb.zstd | wc -l
|
||||
```
|
||||
|
||||
Expected: 22 files
|
||||
|
||||
**Step 3: Commit converted files (optional - they may be gitignored)**
|
||||
|
||||
If the files should be tracked:
|
||||
```bash
|
||||
git add ../../CACHED_DB_FILES/*.pb.zstd
|
||||
git commit -m "data: convert cache files to protobuf format"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Add protobuf-net-data Package to DataSync.Dev
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj`
|
||||
|
||||
**Step 1: Add package reference**
|
||||
|
||||
Add to `JdeScoping.DataSync.Dev.csproj` ItemGroup:
|
||||
|
||||
```xml
|
||||
<PackageReference Include="protobuf-net.Data" Version="3.0.32" />
|
||||
```
|
||||
|
||||
**Step 2: Restore and verify**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet restore src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
```
|
||||
|
||||
Expected: Restore succeeded.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add NEW/src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
git commit -m "deps: add protobuf-net-data to DataSync.Dev"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Create ProtobufZstdFileSource
|
||||
|
||||
**Files:**
|
||||
- Create: `NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs`
|
||||
|
||||
**Step 1: Create the source class**
|
||||
|
||||
Create `NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs`:
|
||||
|
||||
```csharp
|
||||
using System.Data;
|
||||
using JdeScoping.DataSync.Etl.Contracts;
|
||||
using ProtoBuf.Data;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace JdeScoping.DataSync.Dev.Sources;
|
||||
|
||||
/// <summary>
|
||||
/// Import source that reads from a zstd-compressed protobuf file.
|
||||
/// Uses protobuf-net-data for IDataReader deserialization.
|
||||
/// </summary>
|
||||
public sealed class ProtobufZstdFileSource : IImportSource
|
||||
{
|
||||
private const int FileBufferSize = 256 * 1024; // 256 KB
|
||||
private const int DecompressBufferSize = 256 * 1024; // 256 KB
|
||||
|
||||
private readonly string _filePath;
|
||||
private FileStream? _fileStream;
|
||||
private DecompressionStream? _decompressionStream;
|
||||
private BufferedStream? _bufferedStream;
|
||||
private IDataReader? _reader;
|
||||
|
||||
public string SourceName => $"Protobuf:{Path.GetFileName(_filePath)}";
|
||||
|
||||
public ProtobufZstdFileSource(string filePath)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(filePath))
|
||||
throw new ArgumentException("File path cannot be null or empty.", nameof(filePath));
|
||||
|
||||
if (!File.Exists(filePath))
|
||||
throw new FileNotFoundException($"Cache file not found: {filePath}", filePath);
|
||||
|
||||
_filePath = filePath;
|
||||
}
|
||||
|
||||
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_fileStream != null)
|
||||
throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again.");
|
||||
|
||||
try
|
||||
{
|
||||
_fileStream = new FileStream(
|
||||
_filePath,
|
||||
FileMode.Open,
|
||||
FileAccess.Read,
|
||||
FileShare.Read,
|
||||
bufferSize: FileBufferSize,
|
||||
FileOptions.SequentialScan);
|
||||
|
||||
_decompressionStream = new DecompressionStream(_fileStream);
|
||||
_bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize);
|
||||
|
||||
// protobuf-net-data returns IDataReader directly!
|
||||
_reader = DataSerializer.Deserialize(_bufferedStream);
|
||||
|
||||
return Task.FromResult(_reader);
|
||||
}
|
||||
catch
|
||||
{
|
||||
Cleanup();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private void Cleanup()
|
||||
{
|
||||
_reader?.Dispose();
|
||||
_bufferedStream?.Dispose();
|
||||
_decompressionStream?.Dispose();
|
||||
_fileStream?.Dispose();
|
||||
_reader = null;
|
||||
_bufferedStream = null;
|
||||
_decompressionStream = null;
|
||||
_fileStream = null;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_reader != null)
|
||||
{
|
||||
_reader.Dispose();
|
||||
_reader = null;
|
||||
}
|
||||
|
||||
if (_bufferedStream != null)
|
||||
{
|
||||
await _bufferedStream.DisposeAsync();
|
||||
_bufferedStream = null;
|
||||
}
|
||||
|
||||
if (_decompressionStream != null)
|
||||
{
|
||||
await _decompressionStream.DisposeAsync();
|
||||
_decompressionStream = null;
|
||||
}
|
||||
|
||||
if (_fileStream != null)
|
||||
{
|
||||
await _fileStream.DisposeAsync();
|
||||
_fileStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Verify build**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs
|
||||
git commit -m "feat: add ProtobufZstdFileSource for reading protobuf cache files"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: Update BranchDevEtl (First Migration)
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs`
|
||||
|
||||
**Step 1: Update BranchDevEtl**
|
||||
|
||||
Replace contents of `NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs`:
|
||||
|
||||
```csharp
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Etl.Destinations;
|
||||
using JdeScoping.DataSync.Etl.Pipeline;
|
||||
using JdeScoping.DataSync.Dev.Sources;
|
||||
|
||||
namespace JdeScoping.DataSync.Dev;
|
||||
|
||||
/// <summary>
|
||||
/// Development ETL pipeline for the Branch table.
|
||||
/// </summary>
|
||||
public static class BranchDevEtl
|
||||
{
|
||||
public static readonly string TableName = "Branch";
|
||||
public static readonly string CacheFileName = "branch.pb.zstd";
|
||||
|
||||
public static EtlPipeline Create(IDbConnectionFactory connectionFactory, string cacheFilePath)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(connectionFactory);
|
||||
|
||||
if (string.IsNullOrWhiteSpace(cacheFilePath))
|
||||
throw new ArgumentException("Cache file path is required.", nameof(cacheFilePath));
|
||||
|
||||
return new EtlPipelineBuilder()
|
||||
.WithName($"{TableName}_Dev")
|
||||
.WithSource(new ProtobufZstdFileSource(cacheFilePath))
|
||||
.WithDestination(new DbBulkImportDestination(connectionFactory, TableName))
|
||||
.Build();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Verify build**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 3: Run Branch test (if protobuf files exist)**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet test tests/JdeScoping.DataSync.Dev.Tests --filter "FullyQualifiedName~BranchDevEtl" --verbosity normal
|
||||
```
|
||||
|
||||
Expected: Tests pass (or skip if cache files don't exist yet).
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs
|
||||
git commit -m "refactor: migrate BranchDevEtl to protobuf source"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6: Update Remaining DevEtl Files (Batch Migration)
|
||||
|
||||
**Files:**
|
||||
- Modify: All remaining `*DevEtl.cs` files (20 files)
|
||||
|
||||
For each file, apply the same pattern:
|
||||
1. Remove `using JdeScoping.DataSync.Dev.Models;` (no longer needed)
|
||||
2. Remove `private static readonly JsonColumnSchema[] Schema = [...]`
|
||||
3. Change `CacheFileName` from `.json.zstd` to `.pb.zstd`
|
||||
4. Change `new JsonZstdFileSource(cacheFilePath, Schema)` to `new ProtobufZstdFileSource(cacheFilePath)`
|
||||
|
||||
**Step 1: Update all files**
|
||||
|
||||
Apply the pattern from Task 5 to each file:
|
||||
|
||||
| File | CacheFileName |
|
||||
|------|---------------|
|
||||
| `FunctionCodeDevEtl.cs` | `functioncode.pb.zstd` |
|
||||
| `ItemDevEtl.cs` | `item.pb.zstd` |
|
||||
| `JdeUserDevEtl.cs` | `jdeuser.pb.zstd` |
|
||||
| `LotDevEtl.cs` | `lot.pb.zstd` |
|
||||
| `LotUsageCurrDevEtl.cs` | `lotusage_curr.pb.zstd` |
|
||||
| `LotUsageHistDevEtl.cs` | `lotusage_hist.pb.zstd` |
|
||||
| `MisDataDevEtl.cs` | `misdata.pb.zstd` |
|
||||
| `OrgHierarchyDevEtl.cs` | `orghierarchy.pb.zstd` |
|
||||
| `ProfitCenterDevEtl.cs` | `profitcenter.pb.zstd` |
|
||||
| `RouteMasterDevEtl.cs` | `routemaster.pb.zstd` |
|
||||
| `WorkCenterDevEtl.cs` | `workcenter.pb.zstd` |
|
||||
| `WorkOrderComponentCurrDevEtl.cs` | `workordercomponent_curr.pb.zstd` |
|
||||
| `WorkOrderComponentHistDevEtl.cs` | `workordercomponent_hist.pb.zstd` |
|
||||
| `WorkOrderCurrDevEtl.cs` | `workorder_curr.pb.zstd` |
|
||||
| `WorkOrderHistDevEtl.cs` | `workorder_hist.pb.zstd` |
|
||||
| `WorkOrderRoutingDevEtl.cs` | `workorderrouting.pb.zstd` |
|
||||
| `WorkOrderStepCurrDevEtl.cs` | `workorderstep_curr.pb.zstd` |
|
||||
| `WorkOrderStepHistDevEtl.cs` | `workorderstep_hist.pb.zstd` |
|
||||
| `WorkOrderTimeCurrDevEtl.cs` | `workordertime_curr.pb.zstd` |
|
||||
| `WorkOrderTimeHistDevEtl.cs` | `workordertime_hist.pb.zstd` |
|
||||
|
||||
**Step 2: Verify build**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add NEW/src/JdeScoping.DataSync.Dev/*DevEtl.cs
|
||||
git commit -m "refactor: migrate all DevEtl files to protobuf source"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7: Delete Obsolete JSON Source Files
|
||||
|
||||
**Files:**
|
||||
- Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs`
|
||||
- Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs`
|
||||
- Delete: `NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs`
|
||||
- Delete: `NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs`
|
||||
|
||||
**Step 1: Delete files**
|
||||
|
||||
```bash
|
||||
rm NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs
|
||||
rm NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs
|
||||
rm NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs
|
||||
rm NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs
|
||||
```
|
||||
|
||||
**Step 2: Remove empty Models directory if empty**
|
||||
|
||||
```bash
|
||||
rmdir NEW/src/JdeScoping.DataSync.Dev/Models 2>/dev/null || true
|
||||
```
|
||||
|
||||
**Step 3: Verify build**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet build src/JdeScoping.DataSync.Dev/JdeScoping.DataSync.Dev.csproj
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add -A NEW/src/JdeScoping.DataSync.Dev/
|
||||
git commit -m "chore: remove obsolete JSON source files"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 8: Update DevEtlRegistry Comment
|
||||
|
||||
**Files:**
|
||||
- Modify: `NEW/src/JdeScoping.DataSync.Dev/DevEtlRegistry.cs`
|
||||
|
||||
**Step 1: Update class comment**
|
||||
|
||||
Change line 9-10 from:
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// Registry for development ETL pipelines that load from cached JSON files.
|
||||
/// </summary>
|
||||
```
|
||||
|
||||
To:
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// Registry for development ETL pipelines that load from cached protobuf files.
|
||||
/// </summary>
|
||||
```
|
||||
|
||||
**Step 2: Commit**
|
||||
|
||||
```bash
|
||||
git add NEW/src/JdeScoping.DataSync.Dev/DevEtlRegistry.cs
|
||||
git commit -m "docs: update DevEtlRegistry comment for protobuf"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 9: Run All Tests
|
||||
|
||||
**Step 1: Build entire solution**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet build
|
||||
```
|
||||
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 2: Run DataSync.Dev tests**
|
||||
|
||||
```bash
|
||||
cd NEW && dotnet test tests/JdeScoping.DataSync.Dev.Tests --verbosity normal
|
||||
```
|
||||
|
||||
Expected: All tests pass (or skip if cache files don't exist).
|
||||
|
||||
---
|
||||
|
||||
## Task 10: Clean Up Old JSON Cache Files (Manual)
|
||||
|
||||
**After verifying everything works:**
|
||||
|
||||
```bash
|
||||
rm CACHED_DB_FILES/*.json.zstd
|
||||
```
|
||||
|
||||
This step is manual and should only be done after confirming the protobuf files work correctly.
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| Task | Description | Commit Message |
|
||||
|------|-------------|----------------|
|
||||
| 1 | Create converter tool | `feat: add protobuf cache converter tool` |
|
||||
| 2 | Run converter | (optional data commit) |
|
||||
| 3 | Add protobuf-net-data package | `deps: add protobuf-net-data to DataSync.Dev` |
|
||||
| 4 | Create ProtobufZstdFileSource | `feat: add ProtobufZstdFileSource for reading protobuf cache files` |
|
||||
| 5 | Update BranchDevEtl | `refactor: migrate BranchDevEtl to protobuf source` |
|
||||
| 6 | Update remaining DevEtl files | `refactor: migrate all DevEtl files to protobuf source` |
|
||||
| 7 | Delete obsolete JSON files | `chore: remove obsolete JSON source files` |
|
||||
| 8 | Update registry comment | `docs: update DevEtlRegistry comment for protobuf` |
|
||||
| 9 | Run all tests | (verification only) |
|
||||
| 10 | Delete old cache files | (manual cleanup) |
|
||||
Reference in New Issue
Block a user