26ff8d9b4f
Set up repository with legacy .NET Framework 4.8 source (OLD/), new .NET 10 Blazor solution (NEW/), OpenSpec specifications, documentation, and project configuration.
453 lines
17 KiB
C#
453 lines
17 KiB
C#
using System.Diagnostics.Metrics;
|
|
using System.Linq.Expressions;
|
|
using System.Runtime.CompilerServices;
|
|
using JdeScoping.Core.Interfaces;
|
|
using JdeScoping.Core.Models.Enums;
|
|
using JdeScoping.DataAccess.Interfaces;
|
|
using JdeScoping.DataSync.Configuration;
|
|
using JdeScoping.DataSync.Contracts;
|
|
using JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
|
using JdeScoping.DataSync.Models;
|
|
using JdeScoping.DataSync.Services;
|
|
using JdeScoping.DataSync.Telemetry;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using NSubstitute;
|
|
|
|
namespace JdeScoping.DataSync.IntegrationTests;
|
|
|
|
/// <summary>
|
|
/// Integration tests for TableSyncOperation.
|
|
/// Tests end-to-end sync paths (incremental and mass) against real SQL Server.
|
|
/// </summary>
|
|
[Collection("Database")]
|
|
public class TableSyncOperationTests : IAsyncLifetime
|
|
{
|
|
private readonly SqlServerFixture _fixture;
|
|
private SqlConnection _connection = null!;
|
|
|
|
public TableSyncOperationTests(SqlServerFixture fixture)
|
|
{
|
|
_fixture = fixture;
|
|
}
|
|
|
|
public async Task InitializeAsync()
|
|
{
|
|
_connection = await _fixture.CreateConnectionAsync();
|
|
await _fixture.CleanupTablesAsync();
|
|
}
|
|
|
|
public async Task DisposeAsync()
|
|
{
|
|
await _connection.DisposeAsync();
|
|
}
|
|
|
|
#region Daily Update (Incremental Path) Tests
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_DailyUpdate_UsesStagingAndMerge()
|
|
{
|
|
// Arrange
|
|
var baseTime = DateTime.UtcNow;
|
|
|
|
// Pre-populate with some existing records
|
|
var existingRecords = new List<WorkOrderTestEntity>
|
|
{
|
|
new() { OrderNumber = 1, Status = "Old", Description = "Existing 1", Quantity = 10, LastUpdateDT = baseTime.AddHours(-2) },
|
|
new() { OrderNumber = 2, Status = "Old", Description = "Existing 2", Quantity = 20, LastUpdateDT = baseTime.AddHours(-2) }
|
|
};
|
|
await InsertWorkOrdersDirectly(existingRecords);
|
|
|
|
// New records to sync (one update, one insert)
|
|
var syncRecords = new List<WorkOrderTestEntity>
|
|
{
|
|
new() { OrderNumber = 1, Status = "Updated", Description = "Updated 1", Quantity = 100, LastUpdateDT = baseTime },
|
|
new() { OrderNumber = 3, Status = "New", Description = "New 3", Quantity = 30, LastUpdateDT = baseTime }
|
|
};
|
|
|
|
var sut = CreateTableSyncOperation(syncRecords);
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: baseTime.AddDays(-1));
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
var results = (await _connection.QueryAsync<WorkOrderTestEntity>(
|
|
"SELECT * FROM WorkOrder_Test ORDER BY OrderNumber")).ToList();
|
|
|
|
results.Count.ShouldBe(3);
|
|
|
|
// Record 1 should be updated
|
|
results[0].OrderNumber.ShouldBe(1);
|
|
results[0].Status.ShouldBe("Updated");
|
|
results[0].Quantity.ShouldBe(100);
|
|
|
|
// Record 2 should be unchanged (not in sync batch)
|
|
results[1].OrderNumber.ShouldBe(2);
|
|
results[1].Status.ShouldBe("Old");
|
|
|
|
// Record 3 should be inserted
|
|
results[2].OrderNumber.ShouldBe(3);
|
|
results[2].Status.ShouldBe("New");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_HourlyUpdate_UsesStagingAndMerge()
|
|
{
|
|
// Arrange
|
|
var syncRecords = TestDataGenerator.GenerateWorkOrders(15);
|
|
var sut = CreateTableSyncOperation(syncRecords);
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Hourly, minimumDt: DateTime.UtcNow.AddHours(-1));
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
|
count.ShouldBe(15);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Mass Update (Truncate Path) Tests
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_MassUpdate_UsesTruncatePath()
|
|
{
|
|
// Arrange: Pre-populate table
|
|
var existingRecords = TestDataGenerator.GenerateWorkOrders(50);
|
|
await InsertWorkOrdersDirectly(existingRecords);
|
|
|
|
var initialCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
|
initialCount.ShouldBe(50);
|
|
|
|
// New mass sync with different records
|
|
var massRecords = TestDataGenerator.GenerateWorkOrders(30);
|
|
var sut = CreateTableSyncOperation(massRecords);
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert: Table should be truncated and reloaded
|
|
var finalCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
|
finalCount.ShouldBe(30);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_MassUpdate_WithEmptyRecords_TruncatesTable()
|
|
{
|
|
// Arrange: Pre-populate table
|
|
var existingRecords = TestDataGenerator.GenerateWorkOrders(25);
|
|
await InsertWorkOrdersDirectly(existingRecords);
|
|
|
|
var sut = CreateTableSyncOperation(new List<WorkOrderTestEntity>());
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert: Table should be empty
|
|
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
|
count.ShouldBe(0);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Temp Table Cleanup Tests
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_OnSuccess_CleansTempTable()
|
|
{
|
|
// Arrange
|
|
var syncRecords = TestDataGenerator.GenerateWorkOrders(10);
|
|
var sut = CreateTableSyncOperation(syncRecords);
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1));
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert: No temp tables should remain (BulkMergeHelper uses #TEMP_* naming)
|
|
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
|
|
SELECT COUNT(*)
|
|
FROM tempdb.sys.tables
|
|
WHERE name LIKE '#TEMP[_]%'");
|
|
|
|
tempTableCount.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_OnFetcherError_CleansTempTable()
|
|
{
|
|
// Arrange: Create a fetcher that throws after yielding some records
|
|
var failingFetcher = new FailingFetcher(failAfterCount: 5);
|
|
|
|
var sut = CreateTableSyncOperationWithFetcher(failingFetcher);
|
|
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1), fetcherTypeName: nameof(FailingFetcher));
|
|
|
|
// Act & Assert
|
|
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
|
|
|
|
// Temp tables should still be cleaned up
|
|
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
|
|
SELECT COUNT(*)
|
|
FROM tempdb.sys.tables
|
|
WHERE name LIKE '#TEMP[_]%'");
|
|
|
|
tempTableCount.ShouldBe(0);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Helper Methods
|
|
|
|
private TableSyncOperation CreateTableSyncOperation(List<WorkOrderTestEntity> records)
|
|
{
|
|
var fetcher = new TestFetcher(records);
|
|
return CreateTableSyncOperationWithFetcher(fetcher);
|
|
}
|
|
|
|
private TableSyncOperation CreateTableSyncOperationWithFetcher(IDataFetcher<WorkOrderTestEntity> fetcher)
|
|
{
|
|
var services = new ServiceCollection();
|
|
services.AddMetrics();
|
|
services.AddSingleton(fetcher);
|
|
var provider = services.BuildServiceProvider();
|
|
|
|
var connectionFactory = Substitute.For<IDbConnectionFactory>();
|
|
connectionFactory.CreateLotFinderConnectionAsync(Arg.Any<CancellationToken>())
|
|
.Returns(async _ =>
|
|
{
|
|
var conn = new SqlConnection(_fixture.ConnectionString);
|
|
await conn.OpenAsync();
|
|
return conn;
|
|
});
|
|
|
|
var updateRepository = Substitute.For<IDataUpdateRepository>();
|
|
updateRepository.StartUpdateAsync(
|
|
Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
|
|
Arg.Any<UpdateTypes>(), Arg.Any<CancellationToken>())
|
|
.Returns(1);
|
|
|
|
// Setup mock BulkMergeHelper for integration tests - return appropriate results
|
|
var bulkMergeHelper = Substitute.For<IBulkMergeHelper>();
|
|
|
|
// Setup mock merge configuration registry
|
|
var configRegistry = Substitute.For<IMergeConfigurationRegistry>();
|
|
var mockConfig = Substitute.For<IMergeConfiguration<WorkOrderTestEntity>>();
|
|
mockConfig.TableName.Returns("WorkOrder_Test");
|
|
mockConfig.MatchOn.Returns(x => x.OrderNumber);
|
|
mockConfig.UpdateColumns.Returns(x => new { x.Status, x.Description, x.Quantity, x.LastUpdateDT });
|
|
mockConfig.UpdateWhen.Returns((src, tgt) => src.LastUpdateDT > tgt.LastUpdateDT);
|
|
mockConfig.InsertColumns.Returns((Expression<Func<WorkOrderTestEntity, object>>?)null);
|
|
configRegistry.GetConfiguration<WorkOrderTestEntity>().Returns(mockConfig);
|
|
|
|
// Setup BulkMergeHelper to return results based on actual data processing
|
|
bulkMergeHelper.MergeAsync(
|
|
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
|
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
|
Arg.Any<Expression<Func<WorkOrderTestEntity, WorkOrderTestEntity, bool>>>(),
|
|
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<int>(),
|
|
Arg.Any<bool>(),
|
|
Arg.Any<CancellationToken>())
|
|
.Returns(async callInfo =>
|
|
{
|
|
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
|
|
var records = await data.ToListAsync();
|
|
|
|
// Actually perform the merge against the real database using Dapper
|
|
foreach (var record in records)
|
|
{
|
|
var exists = await _connection.ExecuteScalarAsync<bool>(
|
|
"SELECT CASE WHEN EXISTS (SELECT 1 FROM WorkOrder_Test WHERE OrderNumber = @OrderNumber) THEN 1 ELSE 0 END",
|
|
new { record.OrderNumber });
|
|
|
|
if (exists)
|
|
{
|
|
await _connection.ExecuteAsync(@"
|
|
UPDATE WorkOrder_Test
|
|
SET Status = @Status, Description = @Description, Quantity = @Quantity, LastUpdateDT = @LastUpdateDT
|
|
WHERE OrderNumber = @OrderNumber",
|
|
record);
|
|
}
|
|
else
|
|
{
|
|
await _connection.ExecuteAsync(@"
|
|
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
|
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
|
|
record);
|
|
}
|
|
}
|
|
|
|
return new MergeResult(
|
|
records.Count,
|
|
records.Count,
|
|
0,
|
|
0,
|
|
TimeSpan.Zero);
|
|
});
|
|
|
|
bulkMergeHelper.MassInsertAsync(
|
|
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<bool>(),
|
|
Arg.Any<int>(),
|
|
Arg.Any<CancellationToken>())
|
|
.Returns(async callInfo =>
|
|
{
|
|
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
|
|
var records = await data.ToListAsync();
|
|
|
|
// Truncate and insert into real database
|
|
await _connection.ExecuteAsync("TRUNCATE TABLE WorkOrder_Test");
|
|
foreach (var record in records)
|
|
{
|
|
await _connection.ExecuteAsync(@"
|
|
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
|
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
|
|
record);
|
|
}
|
|
|
|
return new MassInsertResult(records.Count, TimeSpan.Zero, true);
|
|
});
|
|
|
|
var options = Options.Create(new DataSyncOptions
|
|
{
|
|
BatchSize = 1000,
|
|
BulkCopyBatchSize = 100
|
|
});
|
|
|
|
var meterFactory = provider.GetRequiredService<IMeterFactory>();
|
|
var metrics = new DataSyncMetrics(meterFactory);
|
|
|
|
// Create a service provider that can resolve our test fetcher
|
|
var testServiceProvider = Substitute.For<IServiceProvider>();
|
|
testServiceProvider.GetService(typeof(TestFetcher)).Returns(fetcher);
|
|
testServiceProvider.GetService(typeof(FailingFetcher)).Returns(fetcher);
|
|
|
|
return new TableSyncOperation(
|
|
testServiceProvider,
|
|
connectionFactory,
|
|
updateRepository,
|
|
bulkMergeHelper,
|
|
configRegistry,
|
|
options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
metrics);
|
|
}
|
|
|
|
private static DataUpdateTask CreateTask(
|
|
string tableName,
|
|
UpdateTypes updateType,
|
|
DateTime? minimumDt = null,
|
|
bool prepurge = false,
|
|
string? fetcherTypeName = null)
|
|
{
|
|
return new DataUpdateTask
|
|
{
|
|
TableName = tableName,
|
|
SourceSystem = "JDE",
|
|
SourceData = tableName.ToUpper(),
|
|
UpdateType = updateType,
|
|
MinimumDt = minimumDt,
|
|
Config = new DataSourceConfig
|
|
{
|
|
TableName = tableName,
|
|
SourceSystem = "JDE",
|
|
SourceData = tableName.ToUpper(),
|
|
FetcherTypeName = fetcherTypeName ?? nameof(TestFetcher),
|
|
IsEnabled = true,
|
|
MassConfig = new ScheduleConfig
|
|
{
|
|
Enabled = true,
|
|
IntervalMinutes = 10080,
|
|
PrepurgeData = prepurge,
|
|
ReIndexData = false
|
|
},
|
|
DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 },
|
|
HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 }
|
|
}
|
|
};
|
|
}
|
|
|
|
private async Task InsertWorkOrdersDirectly(List<WorkOrderTestEntity> records)
|
|
{
|
|
const string sql = @"
|
|
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
|
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)";
|
|
|
|
await _connection.ExecuteAsync(sql, records);
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
|
|
#region Test Fetchers
|
|
|
|
/// <summary>
|
|
/// Test fetcher that yields records from an in-memory list.
|
|
/// </summary>
|
|
public class TestFetcher : IDataFetcher<WorkOrderTestEntity>
|
|
{
|
|
private readonly List<WorkOrderTestEntity> _records;
|
|
|
|
public TestFetcher(List<WorkOrderTestEntity> records)
|
|
{
|
|
_records = records;
|
|
}
|
|
|
|
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
|
|
DateTime? minimumDt,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
foreach (var record in _records)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
yield break;
|
|
|
|
await Task.Yield(); // Simulate async behavior
|
|
yield return record;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Test fetcher that fails after yielding a specified number of records.
|
|
/// </summary>
|
|
public class FailingFetcher : IDataFetcher<WorkOrderTestEntity>
|
|
{
|
|
private readonly int _failAfterCount;
|
|
|
|
public FailingFetcher(int failAfterCount)
|
|
{
|
|
_failAfterCount = failAfterCount;
|
|
}
|
|
|
|
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
|
|
DateTime? minimumDt,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
for (var i = 0; i < _failAfterCount; i++)
|
|
{
|
|
await Task.Yield();
|
|
yield return new WorkOrderTestEntity
|
|
{
|
|
OrderNumber = i + 1,
|
|
Status = "Test",
|
|
Description = $"Record {i + 1}",
|
|
Quantity = i * 10,
|
|
LastUpdateDT = DateTime.UtcNow
|
|
};
|
|
}
|
|
|
|
throw new InvalidOperationException("Simulated fetcher failure");
|
|
}
|
|
}
|
|
|
|
#endregion
|