refactor(datasync): remove obsolete integration tests project
This commit is contained in:
@@ -19,7 +19,6 @@
|
||||
<Project Path="tests/JdeScoping.Core.Tests/JdeScoping.Core.Tests.csproj" />
|
||||
<Project Path="tests/JdeScoping.DataAccess.Tests/JdeScoping.DataAccess.Tests.csproj" />
|
||||
<Project Path="tests/JdeScoping.Database.Tests/JdeScoping.Database.Tests.csproj" />
|
||||
<Project Path="tests/JdeScoping.DataSync.IntegrationTests/JdeScoping.DataSync.IntegrationTests.csproj" />
|
||||
<Project Path="tests/JdeScoping.DataSync.Tests/JdeScoping.DataSync.Tests.csproj" />
|
||||
<Project Path="tests/JdeScoping.DataSync.Dev.Tests/JdeScoping.DataSync.Dev.Tests.csproj" />
|
||||
<Project Path="tests/JdeScoping.ExcelIO.Tests/JdeScoping.ExcelIO.Tests.csproj" />
|
||||
|
||||
@@ -1,298 +0,0 @@
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// Integration tests for BulkMergeHelper.
|
||||
/// These tests verify the bulk merge functionality against a real SQL Server database.
|
||||
/// </summary>
|
||||
[Collection("Database")]
|
||||
public class BulkMergeHelperTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SqlServerFixture _fixture;
|
||||
private readonly IBulkMergeHelper _bulkMergeHelper;
|
||||
|
||||
public BulkMergeHelperTests(SqlServerFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
|
||||
// Create the BulkMergeHelper with test dependencies
|
||||
var connectionFactory = new TestDbConnectionFactory(_fixture.ConnectionString);
|
||||
var dataReaderFactory = new TestDataReaderFactory();
|
||||
var schemaValidator = new SchemaValidator();
|
||||
var logger = NullLogger<BulkMergeHelper>.Instance;
|
||||
|
||||
_bulkMergeHelper = new BulkMergeHelper(
|
||||
connectionFactory,
|
||||
dataReaderFactory,
|
||||
schemaValidator,
|
||||
logger);
|
||||
}
|
||||
|
||||
public Task InitializeAsync() => _fixture.CleanupBulkMergeTestTableAsync();
|
||||
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
|
||||
#region Insert Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_NewRecords_InsertsAll()
|
||||
{
|
||||
// Arrange
|
||||
var data = GenerateTestData(10);
|
||||
|
||||
// Act
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
data.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(10);
|
||||
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
||||
result.BatchCount.ShouldBeGreaterThan(0);
|
||||
|
||||
// Verify in database
|
||||
await using var connection = await _fixture.CreateConnectionAsync();
|
||||
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
||||
count.ShouldBe(10);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_EmptyData_ReturnsZeroRows()
|
||||
{
|
||||
// Arrange
|
||||
var data = Array.Empty<BulkMergeTestEntity>();
|
||||
|
||||
// Act
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
data.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id);
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(0);
|
||||
result.TotalRowsAffected.ShouldBe(0);
|
||||
result.BatchCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Update Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_ExistingRecords_UpdatesAll()
|
||||
{
|
||||
// Arrange - Insert initial data
|
||||
var initialData = GenerateTestData(5);
|
||||
await _bulkMergeHelper.MergeAsync(
|
||||
initialData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Modify the data
|
||||
var updatedData = initialData.Select(e => new BulkMergeTestEntity
|
||||
{
|
||||
Id = e.Id,
|
||||
Name = e.Name + "_Updated",
|
||||
Amount = (e.Amount ?? 0) + 100,
|
||||
LastUpdateDt = DateTime.UtcNow
|
||||
}).ToList();
|
||||
|
||||
// Act
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
updatedData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(5);
|
||||
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
||||
|
||||
// Verify in database
|
||||
await using var connection = await _fixture.CreateConnectionAsync();
|
||||
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
||||
count.ShouldBe(5); // Still 5 records, not 10
|
||||
|
||||
var updatedCount = await connection.ExecuteScalarAsync<int>(
|
||||
"SELECT COUNT(*) FROM BulkMergeTest WHERE Name LIKE '%_Updated'");
|
||||
updatedCount.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_MixedRecords_InsertsAndUpdates()
|
||||
{
|
||||
// Arrange - Insert initial data
|
||||
var initialData = GenerateTestData(5);
|
||||
await _bulkMergeHelper.MergeAsync(
|
||||
initialData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Create mixed data: 3 updates + 2 inserts
|
||||
var mixedData = new List<BulkMergeTestEntity>();
|
||||
|
||||
// Updates
|
||||
for (int i = 0; i < 3; i++)
|
||||
{
|
||||
mixedData.Add(new BulkMergeTestEntity
|
||||
{
|
||||
Id = initialData[i].Id,
|
||||
Name = "Updated_" + i,
|
||||
Amount = 999m,
|
||||
LastUpdateDt = DateTime.UtcNow
|
||||
});
|
||||
}
|
||||
|
||||
// New inserts
|
||||
for (int i = 100; i < 102; i++)
|
||||
{
|
||||
mixedData.Add(new BulkMergeTestEntity
|
||||
{
|
||||
Id = i,
|
||||
Name = "New_" + i,
|
||||
Amount = i * 10m,
|
||||
LastUpdateDt = DateTime.UtcNow
|
||||
});
|
||||
}
|
||||
|
||||
// Act
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
mixedData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(5);
|
||||
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
||||
|
||||
// Verify in database
|
||||
await using var connection = await _fixture.CreateConnectionAsync();
|
||||
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
||||
count.ShouldBe(7); // 5 initial + 2 new = 7 (3 updated in place)
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Conditional Update Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_WithUpdateWhen_OnlyUpdatesWhenConditionMet()
|
||||
{
|
||||
// Arrange - Insert initial data with old timestamp
|
||||
var oldDate = DateTime.UtcNow.AddDays(-1);
|
||||
var initialData = GenerateTestData(3, oldDate);
|
||||
await _bulkMergeHelper.MergeAsync(
|
||||
initialData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Create update data:
|
||||
// - First record has NEWER date (should update)
|
||||
// - Second record has OLDER date (should NOT update)
|
||||
// - Third record has SAME date (should NOT update)
|
||||
var newDate = DateTime.UtcNow;
|
||||
var olderDate = DateTime.UtcNow.AddDays(-2);
|
||||
|
||||
var updateData = new List<BulkMergeTestEntity>
|
||||
{
|
||||
new() { Id = initialData[0].Id, Name = "ShouldUpdate", Amount = 999m, LastUpdateDt = newDate },
|
||||
new() { Id = initialData[1].Id, Name = "ShouldNotUpdate", Amount = 888m, LastUpdateDt = olderDate },
|
||||
new() { Id = initialData[2].Id, Name = "ShouldNotUpdate", Amount = 777m, LastUpdateDt = oldDate }
|
||||
};
|
||||
|
||||
// Act
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
updateData.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
updateWhen: (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt,
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(3);
|
||||
|
||||
// Verify in database
|
||||
await using var connection = await _fixture.CreateConnectionAsync();
|
||||
var shouldUpdate = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
||||
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
||||
new { Id = initialData[0].Id });
|
||||
shouldUpdate.Name.ShouldBe("ShouldUpdate");
|
||||
|
||||
var shouldNotUpdate1 = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
||||
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
||||
new { Id = initialData[1].Id });
|
||||
shouldNotUpdate1.Name.ShouldNotBe("ShouldNotUpdate");
|
||||
|
||||
var shouldNotUpdate2 = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
||||
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
||||
new { Id = initialData[2].Id });
|
||||
shouldNotUpdate2.Name.ShouldNotBe("ShouldNotUpdate");
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Batching Tests
|
||||
|
||||
[Fact]
|
||||
public async Task MergeAsync_LargeDataset_ProcessesInBatches()
|
||||
{
|
||||
// Arrange
|
||||
var data = GenerateTestData(250);
|
||||
|
||||
// Act - Use small batch size to force multiple batches
|
||||
var result = await _bulkMergeHelper.MergeAsync(
|
||||
data.ToAsyncEnumerable(),
|
||||
"BulkMergeTest",
|
||||
x => x.Id,
|
||||
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
||||
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt },
|
||||
batchSize: 50);
|
||||
|
||||
// Assert
|
||||
result.TotalRowsProcessed.ShouldBe(250);
|
||||
result.BatchCount.ShouldBe(5); // 250 / 50 = 5 batches
|
||||
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
||||
|
||||
// Verify in database
|
||||
await using var connection = await _fixture.CreateConnectionAsync();
|
||||
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
||||
count.ShouldBe(250);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helper Methods
|
||||
|
||||
private static List<BulkMergeTestEntity> GenerateTestData(int count, DateTime? lastUpdateDt = null)
|
||||
{
|
||||
var date = lastUpdateDt ?? DateTime.UtcNow;
|
||||
return Enumerable.Range(1, count)
|
||||
.Select(i => new BulkMergeTestEntity
|
||||
{
|
||||
Id = i,
|
||||
Name = $"TestItem_{i}",
|
||||
Amount = i * 10.5m,
|
||||
LastUpdateDt = date
|
||||
})
|
||||
.ToList();
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
global using Xunit;
|
||||
global using Shouldly;
|
||||
global using Microsoft.Data.SqlClient;
|
||||
global using Dapper;
|
||||
@@ -1,12 +0,0 @@
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Test entity for BulkMergeHelper integration tests.
|
||||
/// </summary>
|
||||
public class BulkMergeTestEntity
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public decimal? Amount { get; set; }
|
||||
public DateTime LastUpdateDt { get; set; }
|
||||
}
|
||||
-34
@@ -1,34 +0,0 @@
|
||||
using System.Data;
|
||||
using JdeScoping.DataSync.Generated;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// IDataReader implementation for BulkMergeTestEntity.
|
||||
/// </summary>
|
||||
public sealed class BulkMergeTestEntityDataReader : AsyncEnumerableDataReader<BulkMergeTestEntity>
|
||||
{
|
||||
private static readonly string[] _columnNames = ["Id", "Name", "Amount", "LastUpdateDt"];
|
||||
private static readonly Type[] _columnTypes = [typeof(int), typeof(string), typeof(decimal), typeof(DateTime)];
|
||||
|
||||
public BulkMergeTestEntityDataReader(IAsyncEnumerable<BulkMergeTestEntity> source) : base(source) { }
|
||||
|
||||
protected override string[] ColumnNames => _columnNames;
|
||||
|
||||
public static IReadOnlyList<string> GetColumnNames() => _columnNames;
|
||||
|
||||
protected override object GetColumnValue(int ordinal)
|
||||
{
|
||||
var entity = Current!;
|
||||
return ordinal switch
|
||||
{
|
||||
0 => entity.Id,
|
||||
1 => entity.Name,
|
||||
2 => entity.Amount ?? (object)DBNull.Value,
|
||||
3 => entity.LastUpdateDt,
|
||||
_ => throw new IndexOutOfRangeException()
|
||||
};
|
||||
}
|
||||
|
||||
protected override Type GetColumnType(int ordinal) => _columnTypes[ordinal];
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
using Testcontainers.MsSql;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Shared fixture that manages the SQL Server Testcontainer lifecycle.
|
||||
/// Container is started once per test collection and shared across all tests.
|
||||
/// </summary>
|
||||
public class SqlServerFixture : IAsyncLifetime
|
||||
{
|
||||
private readonly MsSqlContainer _container;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the connection string to the test SQL Server instance.
|
||||
/// </summary>
|
||||
public string ConnectionString => _container.GetConnectionString();
|
||||
|
||||
public SqlServerFixture()
|
||||
{
|
||||
_container = new MsSqlBuilder()
|
||||
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
|
||||
.WithPassword("Test@Password123!")
|
||||
.Build();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts the container and initializes the test database schema.
|
||||
/// </summary>
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
await _container.StartAsync();
|
||||
await TestDatabaseInitializer.InitializeAsync(ConnectionString);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops and disposes the container.
|
||||
/// </summary>
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _container.DisposeAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new open connection to the test database.
|
||||
/// Caller is responsible for disposing the connection.
|
||||
/// </summary>
|
||||
public async Task<SqlConnection> CreateConnectionAsync()
|
||||
{
|
||||
var connection = new SqlConnection(ConnectionString);
|
||||
await connection.OpenAsync();
|
||||
return connection;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Truncates all test tables to ensure clean state between tests.
|
||||
/// </summary>
|
||||
public async Task CleanupTablesAsync()
|
||||
{
|
||||
await using var connection = await CreateConnectionAsync();
|
||||
await connection.ExecuteAsync(@"
|
||||
TRUNCATE TABLE WorkOrder_Test;
|
||||
TRUNCATE TABLE Item_Test;
|
||||
TRUNCATE TABLE LotUsage_Test;
|
||||
TRUNCATE TABLE DataUpdate_Test;
|
||||
TRUNCATE TABLE BulkMergeTest;
|
||||
");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cleans up just the BulkMergeTest table.
|
||||
/// </summary>
|
||||
public async Task CleanupBulkMergeTestTableAsync()
|
||||
{
|
||||
await using var connection = await CreateConnectionAsync();
|
||||
await connection.ExecuteAsync("TRUNCATE TABLE BulkMergeTest;");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Collection definition for sharing the SQL Server fixture across test classes.
|
||||
/// </summary>
|
||||
[CollectionDefinition("Database")]
|
||||
public class DatabaseCollection : ICollectionFixture<SqlServerFixture>
|
||||
{
|
||||
// This class has no code, and is never created.
|
||||
// Its purpose is to be the place to apply [CollectionDefinition]
|
||||
// and all the ICollectionFixture<> interfaces.
|
||||
}
|
||||
@@ -1,142 +0,0 @@
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Generates test data for integration tests.
|
||||
/// </summary>
|
||||
public static class TestDataGenerator
|
||||
{
|
||||
/// <summary>
|
||||
/// Generates a list of WorkOrder entities with sequential IDs.
|
||||
/// </summary>
|
||||
public static List<WorkOrderTestEntity> GenerateWorkOrders(int count, DateTime? baseTime = null)
|
||||
{
|
||||
var time = baseTime ?? DateTime.UtcNow;
|
||||
return Enumerable.Range(1, count)
|
||||
.Select(i => new WorkOrderTestEntity
|
||||
{
|
||||
OrderNumber = i,
|
||||
Status = i % 2 == 0 ? "Active" : "Closed",
|
||||
Description = $"Work Order {i}",
|
||||
Quantity = i * 10.5m,
|
||||
LastUpdateDT = time.AddMinutes(-i)
|
||||
})
|
||||
.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generates WorkOrders with duplicate primary keys (for deduplication testing).
|
||||
/// Each OrderNumber appears twice with different timestamps.
|
||||
/// </summary>
|
||||
public static List<WorkOrderTestEntity> GenerateWorkOrdersWithDuplicates(int uniqueCount, DateTime baseTime)
|
||||
{
|
||||
var orders = new List<WorkOrderTestEntity>();
|
||||
|
||||
for (var i = 1; i <= uniqueCount; i++)
|
||||
{
|
||||
// Older version
|
||||
orders.Add(new WorkOrderTestEntity
|
||||
{
|
||||
OrderNumber = i,
|
||||
Status = "Old",
|
||||
Description = $"Work Order {i} - Old",
|
||||
Quantity = i * 10m,
|
||||
LastUpdateDT = baseTime.AddHours(-2)
|
||||
});
|
||||
|
||||
// Newer version (should be kept after deduplication)
|
||||
orders.Add(new WorkOrderTestEntity
|
||||
{
|
||||
OrderNumber = i,
|
||||
Status = "New",
|
||||
Description = $"Work Order {i} - New",
|
||||
Quantity = i * 20m,
|
||||
LastUpdateDT = baseTime
|
||||
});
|
||||
}
|
||||
|
||||
return orders;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generates Item entities (no LastUpdateDT column).
|
||||
/// </summary>
|
||||
public static List<ItemTestEntity> GenerateItems(int count)
|
||||
{
|
||||
return Enumerable.Range(1, count)
|
||||
.Select(i => new ItemTestEntity
|
||||
{
|
||||
ItemNumber = $"ITEM{i:D6}",
|
||||
Description = $"Item {i}",
|
||||
UnitOfMeasure = i % 3 == 0 ? "EA" : (i % 3 == 1 ? "KG" : "LB")
|
||||
})
|
||||
.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generates LotUsage entities with composite primary key.
|
||||
/// </summary>
|
||||
public static List<LotUsageTestEntity> GenerateLotUsages(int count, DateTime? baseTime = null)
|
||||
{
|
||||
var time = baseTime ?? DateTime.UtcNow;
|
||||
return Enumerable.Range(1, count)
|
||||
.Select(i => new LotUsageTestEntity
|
||||
{
|
||||
LotNumber = $"LOT{i:D6}",
|
||||
OrderNumber = (i % 10) + 1, // Reuse order numbers
|
||||
Quantity = i * 5.25m,
|
||||
LastUpdateDT = time.AddMinutes(-i)
|
||||
})
|
||||
.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generates a large dataset for batching tests.
|
||||
/// </summary>
|
||||
public static List<WorkOrderTestEntity> GenerateLargeDataset(int count)
|
||||
{
|
||||
var time = DateTime.UtcNow;
|
||||
return Enumerable.Range(1, count)
|
||||
.Select(i => new WorkOrderTestEntity
|
||||
{
|
||||
OrderNumber = i,
|
||||
Status = "Active",
|
||||
Description = $"WO-{i}",
|
||||
Quantity = i,
|
||||
LastUpdateDT = time
|
||||
})
|
||||
.ToList();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test entity matching WorkOrder_Test table schema.
|
||||
/// </summary>
|
||||
public class WorkOrderTestEntity
|
||||
{
|
||||
public int OrderNumber { get; set; }
|
||||
public string? Status { get; set; }
|
||||
public string? Description { get; set; }
|
||||
public decimal? Quantity { get; set; }
|
||||
public DateTime LastUpdateDT { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test entity matching Item_Test table schema (no LastUpdateDT).
|
||||
/// </summary>
|
||||
public class ItemTestEntity
|
||||
{
|
||||
public string ItemNumber { get; set; } = string.Empty;
|
||||
public string? Description { get; set; }
|
||||
public string? UnitOfMeasure { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test entity matching LotUsage_Test table schema (composite PK).
|
||||
/// </summary>
|
||||
public class LotUsageTestEntity
|
||||
{
|
||||
public string LotNumber { get; set; } = string.Empty;
|
||||
public int OrderNumber { get; set; }
|
||||
public decimal? Quantity { get; set; }
|
||||
public DateTime LastUpdateDT { get; set; }
|
||||
}
|
||||
-37
@@ -1,37 +0,0 @@
|
||||
using System.Data;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.Generated;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// DataReaderFactory for integration tests that supports both test entities and production entities.
|
||||
/// </summary>
|
||||
public class TestDataReaderFactory : IDataReaderFactory
|
||||
{
|
||||
private readonly DataReaderFactory _innerFactory = new();
|
||||
|
||||
public IDataReader CreateReader<T>(IAsyncEnumerable<T> source) where T : class
|
||||
{
|
||||
// Handle test entity
|
||||
if (typeof(T) == typeof(BulkMergeTestEntity))
|
||||
{
|
||||
return new BulkMergeTestEntityDataReader((IAsyncEnumerable<BulkMergeTestEntity>)(object)source);
|
||||
}
|
||||
|
||||
// Delegate to production factory for other types
|
||||
return _innerFactory.CreateReader(source);
|
||||
}
|
||||
|
||||
public IReadOnlyList<string> GetColumnNames<T>() where T : class
|
||||
{
|
||||
// Handle test entity
|
||||
if (typeof(T) == typeof(BulkMergeTestEntity))
|
||||
{
|
||||
return BulkMergeTestEntityDataReader.GetColumnNames();
|
||||
}
|
||||
|
||||
// Delegate to production factory for other types
|
||||
return _innerFactory.GetColumnNames<T>();
|
||||
}
|
||||
}
|
||||
-91
@@ -1,91 +0,0 @@
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes the test database schema.
|
||||
/// Creates test tables that mirror production schemas.
|
||||
/// </summary>
|
||||
public static class TestDatabaseInitializer
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates all test tables in the database.
|
||||
/// </summary>
|
||||
public static async Task InitializeAsync(string connectionString)
|
||||
{
|
||||
await using var connection = new SqlConnection(connectionString);
|
||||
await connection.OpenAsync();
|
||||
|
||||
// WorkOrder_Test: For MERGE and bulk copy tests (has LastUpdateDT)
|
||||
await connection.ExecuteAsync(@"
|
||||
IF OBJECT_ID('WorkOrder_Test', 'U') IS NOT NULL
|
||||
DROP TABLE WorkOrder_Test;
|
||||
|
||||
CREATE TABLE WorkOrder_Test (
|
||||
OrderNumber INT NOT NULL PRIMARY KEY,
|
||||
Status VARCHAR(10) NULL,
|
||||
Description VARCHAR(100) NULL,
|
||||
Quantity DECIMAL(18,4) NULL,
|
||||
LastUpdateDT DATETIME2 NOT NULL
|
||||
);
|
||||
|
||||
CREATE NONCLUSTERED INDEX IX_WorkOrder_Test_Status
|
||||
ON WorkOrder_Test(Status);
|
||||
");
|
||||
|
||||
// Item_Test: For tables WITHOUT LastUpdateDT (unconditional update)
|
||||
await connection.ExecuteAsync(@"
|
||||
IF OBJECT_ID('Item_Test', 'U') IS NOT NULL
|
||||
DROP TABLE Item_Test;
|
||||
|
||||
CREATE TABLE Item_Test (
|
||||
ItemNumber VARCHAR(25) NOT NULL PRIMARY KEY,
|
||||
Description VARCHAR(100) NULL,
|
||||
UnitOfMeasure VARCHAR(10) NULL
|
||||
);
|
||||
");
|
||||
|
||||
// LotUsage_Test: For composite primary key tests
|
||||
await connection.ExecuteAsync(@"
|
||||
IF OBJECT_ID('LotUsage_Test', 'U') IS NOT NULL
|
||||
DROP TABLE LotUsage_Test;
|
||||
|
||||
CREATE TABLE LotUsage_Test (
|
||||
LotNumber VARCHAR(30) NOT NULL,
|
||||
OrderNumber INT NOT NULL,
|
||||
Quantity DECIMAL(18,4) NULL,
|
||||
LastUpdateDT DATETIME2 NOT NULL,
|
||||
CONSTRAINT PK_LotUsage_Test PRIMARY KEY (LotNumber, OrderNumber)
|
||||
);
|
||||
");
|
||||
|
||||
// DataUpdate_Test: For update logging tests
|
||||
await connection.ExecuteAsync(@"
|
||||
IF OBJECT_ID('DataUpdate_Test', 'U') IS NOT NULL
|
||||
DROP TABLE DataUpdate_Test;
|
||||
|
||||
CREATE TABLE DataUpdate_Test (
|
||||
Id INT IDENTITY(1,1) PRIMARY KEY,
|
||||
TableName VARCHAR(50) NOT NULL,
|
||||
SourceSystem VARCHAR(10) NOT NULL,
|
||||
SourceData VARCHAR(50) NOT NULL,
|
||||
UpdateType INT NOT NULL,
|
||||
StartDT DATETIME2 NOT NULL,
|
||||
EndDT DATETIME2 NULL,
|
||||
NumberRecords INT NOT NULL,
|
||||
WasSuccessful BIT NULL
|
||||
);
|
||||
");
|
||||
|
||||
// BulkMergeTest: For BulkMergeHelper integration tests
|
||||
await connection.ExecuteAsync(@"
|
||||
IF OBJECT_ID('BulkMergeTest', 'U') IS NOT NULL
|
||||
DROP TABLE BulkMergeTest;
|
||||
|
||||
CREATE TABLE BulkMergeTest (
|
||||
Id INT NOT NULL PRIMARY KEY,
|
||||
Name NVARCHAR(100) NOT NULL,
|
||||
Amount DECIMAL(18,2) NULL,
|
||||
LastUpdateDt DATETIME2 NOT NULL
|
||||
);
|
||||
");
|
||||
}
|
||||
}
|
||||
-39
@@ -1,39 +0,0 @@
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using Oracle.ManagedDataAccess.Client;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Connection factory for integration tests that uses the test container connection string.
|
||||
/// </summary>
|
||||
public class TestDbConnectionFactory : IDbConnectionFactory
|
||||
{
|
||||
private readonly string _connectionString;
|
||||
|
||||
public TestDbConnectionFactory(string connectionString)
|
||||
{
|
||||
_connectionString = connectionString;
|
||||
}
|
||||
|
||||
public async Task<SqlConnection> CreateLotFinderConnectionAsync(CancellationToken ct = default)
|
||||
{
|
||||
var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
return connection;
|
||||
}
|
||||
|
||||
public Task<OracleConnection> CreateJdeConnectionAsync(CancellationToken ct = default)
|
||||
{
|
||||
throw new NotImplementedException("JDE connection not supported in integration tests");
|
||||
}
|
||||
|
||||
public Task<OracleConnection> CreateJdeStageConnectionAsync(CancellationToken ct = default)
|
||||
{
|
||||
throw new NotImplementedException("JDE Stage connection not supported in integration tests");
|
||||
}
|
||||
|
||||
public Task<OracleConnection> CreateCmsConnectionAsync(CancellationToken ct = default)
|
||||
{
|
||||
throw new NotImplementedException("CMS connection not supported in integration tests");
|
||||
}
|
||||
}
|
||||
-40
@@ -1,40 +0,0 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.4" />
|
||||
<PackageReference Include="Dapper" Version="2.1.66" />
|
||||
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.3" />
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="10.0.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Diagnostics" Version="10.0.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.1" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
|
||||
<PackageReference Include="NSubstitute" Version="5.3.0" />
|
||||
<PackageReference Include="Shouldly" Version="4.3.0" />
|
||||
<PackageReference Include="Testcontainers.MsSql" Version="4.3.0" />
|
||||
<PackageReference Include="xunit" Version="2.9.3" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4">
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Using Include="Xunit" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\JdeScoping.Core\JdeScoping.Core.csproj" />
|
||||
<ProjectReference Include="..\..\src\JdeScoping.DataAccess\JdeScoping.DataAccess.csproj" />
|
||||
<ProjectReference Include="..\..\src\JdeScoping.DataSync\JdeScoping.DataSync.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -1,452 +0,0 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Linq.Expressions;
|
||||
using System.Runtime.CompilerServices;
|
||||
using JdeScoping.Core.Interfaces;
|
||||
using JdeScoping.Core.Models.Enums;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Options;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
||||
using JdeScoping.DataSync.Models;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using JdeScoping.DataSync.Telemetry;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NSubstitute;
|
||||
|
||||
namespace JdeScoping.DataSync.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// Integration tests for TableSyncOperation.
|
||||
/// Tests end-to-end sync paths (incremental and mass) against real SQL Server.
|
||||
/// </summary>
|
||||
[Collection("Database")]
|
||||
public class TableSyncOperationTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SqlServerFixture _fixture;
|
||||
private SqlConnection _connection = null!;
|
||||
|
||||
public TableSyncOperationTests(SqlServerFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_connection = await _fixture.CreateConnectionAsync();
|
||||
await _fixture.CleanupTablesAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _connection.DisposeAsync();
|
||||
}
|
||||
|
||||
#region Daily Update (Incremental Path) Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_DailyUpdate_UsesStagingAndMerge()
|
||||
{
|
||||
// Arrange
|
||||
var baseTime = DateTime.UtcNow;
|
||||
|
||||
// Pre-populate with some existing records
|
||||
var existingRecords = new List<WorkOrderTestEntity>
|
||||
{
|
||||
new() { OrderNumber = 1, Status = "Old", Description = "Existing 1", Quantity = 10, LastUpdateDT = baseTime.AddHours(-2) },
|
||||
new() { OrderNumber = 2, Status = "Old", Description = "Existing 2", Quantity = 20, LastUpdateDT = baseTime.AddHours(-2) }
|
||||
};
|
||||
await InsertWorkOrdersDirectly(existingRecords);
|
||||
|
||||
// New records to sync (one update, one insert)
|
||||
var syncRecords = new List<WorkOrderTestEntity>
|
||||
{
|
||||
new() { OrderNumber = 1, Status = "Updated", Description = "Updated 1", Quantity = 100, LastUpdateDT = baseTime },
|
||||
new() { OrderNumber = 3, Status = "New", Description = "New 3", Quantity = 30, LastUpdateDT = baseTime }
|
||||
};
|
||||
|
||||
var sut = CreateTableSyncOperation(syncRecords);
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: baseTime.AddDays(-1));
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
var results = (await _connection.QueryAsync<WorkOrderTestEntity>(
|
||||
"SELECT * FROM WorkOrder_Test ORDER BY OrderNumber")).ToList();
|
||||
|
||||
results.Count.ShouldBe(3);
|
||||
|
||||
// Record 1 should be updated
|
||||
results[0].OrderNumber.ShouldBe(1);
|
||||
results[0].Status.ShouldBe("Updated");
|
||||
results[0].Quantity.ShouldBe(100);
|
||||
|
||||
// Record 2 should be unchanged (not in sync batch)
|
||||
results[1].OrderNumber.ShouldBe(2);
|
||||
results[1].Status.ShouldBe("Old");
|
||||
|
||||
// Record 3 should be inserted
|
||||
results[2].OrderNumber.ShouldBe(3);
|
||||
results[2].Status.ShouldBe("New");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_HourlyUpdate_UsesStagingAndMerge()
|
||||
{
|
||||
// Arrange
|
||||
var syncRecords = TestDataGenerator.GenerateWorkOrders(15);
|
||||
var sut = CreateTableSyncOperation(syncRecords);
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Hourly, minimumDt: DateTime.UtcNow.AddHours(-1));
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
||||
count.ShouldBe(15);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Mass Update (Truncate Path) Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_MassUpdate_UsesTruncatePath()
|
||||
{
|
||||
// Arrange: Pre-populate table
|
||||
var existingRecords = TestDataGenerator.GenerateWorkOrders(50);
|
||||
await InsertWorkOrdersDirectly(existingRecords);
|
||||
|
||||
var initialCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
||||
initialCount.ShouldBe(50);
|
||||
|
||||
// New mass sync with different records
|
||||
var massRecords = TestDataGenerator.GenerateWorkOrders(30);
|
||||
var sut = CreateTableSyncOperation(massRecords);
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: Table should be truncated and reloaded
|
||||
var finalCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
||||
finalCount.ShouldBe(30);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_MassUpdate_WithEmptyRecords_TruncatesTable()
|
||||
{
|
||||
// Arrange: Pre-populate table
|
||||
var existingRecords = TestDataGenerator.GenerateWorkOrders(25);
|
||||
await InsertWorkOrdersDirectly(existingRecords);
|
||||
|
||||
var sut = CreateTableSyncOperation(new List<WorkOrderTestEntity>());
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: Table should be empty
|
||||
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
|
||||
count.ShouldBe(0);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Temp Table Cleanup Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_OnSuccess_CleansTempTable()
|
||||
{
|
||||
// Arrange
|
||||
var syncRecords = TestDataGenerator.GenerateWorkOrders(10);
|
||||
var sut = CreateTableSyncOperation(syncRecords);
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1));
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert: No temp tables should remain (BulkMergeHelper uses #TEMP_* naming)
|
||||
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
|
||||
SELECT COUNT(*)
|
||||
FROM tempdb.sys.tables
|
||||
WHERE name LIKE '#TEMP[_]%'");
|
||||
|
||||
tempTableCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_OnFetcherError_CleansTempTable()
|
||||
{
|
||||
// Arrange: Create a fetcher that throws after yielding some records
|
||||
var failingFetcher = new FailingFetcher(failAfterCount: 5);
|
||||
|
||||
var sut = CreateTableSyncOperationWithFetcher(failingFetcher);
|
||||
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1), fetcherTypeName: nameof(FailingFetcher));
|
||||
|
||||
// Act & Assert
|
||||
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
|
||||
|
||||
// Temp tables should still be cleaned up
|
||||
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
|
||||
SELECT COUNT(*)
|
||||
FROM tempdb.sys.tables
|
||||
WHERE name LIKE '#TEMP[_]%'");
|
||||
|
||||
tempTableCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helper Methods
|
||||
|
||||
private TableSyncOperation CreateTableSyncOperation(List<WorkOrderTestEntity> records)
|
||||
{
|
||||
var fetcher = new TestFetcher(records);
|
||||
return CreateTableSyncOperationWithFetcher(fetcher);
|
||||
}
|
||||
|
||||
private TableSyncOperation CreateTableSyncOperationWithFetcher(IDataFetcher<WorkOrderTestEntity> fetcher)
|
||||
{
|
||||
var services = new ServiceCollection();
|
||||
services.AddMetrics();
|
||||
services.AddSingleton(fetcher);
|
||||
var provider = services.BuildServiceProvider();
|
||||
|
||||
var connectionFactory = Substitute.For<IDbConnectionFactory>();
|
||||
connectionFactory.CreateLotFinderConnectionAsync(Arg.Any<CancellationToken>())
|
||||
.Returns(async _ =>
|
||||
{
|
||||
var conn = new SqlConnection(_fixture.ConnectionString);
|
||||
await conn.OpenAsync();
|
||||
return conn;
|
||||
});
|
||||
|
||||
var updateRepository = Substitute.For<IDataUpdateRepository>();
|
||||
updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(), Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
// Setup mock BulkMergeHelper for integration tests - return appropriate results
|
||||
var bulkMergeHelper = Substitute.For<IBulkMergeHelper>();
|
||||
|
||||
// Setup mock merge configuration registry
|
||||
var configRegistry = Substitute.For<IMergeConfigurationRegistry>();
|
||||
var mockConfig = Substitute.For<IMergeConfiguration<WorkOrderTestEntity>>();
|
||||
mockConfig.TableName.Returns("WorkOrder_Test");
|
||||
mockConfig.MatchOn.Returns(x => x.OrderNumber);
|
||||
mockConfig.UpdateColumns.Returns(x => new { x.Status, x.Description, x.Quantity, x.LastUpdateDT });
|
||||
mockConfig.UpdateWhen.Returns((src, tgt) => src.LastUpdateDT > tgt.LastUpdateDT);
|
||||
mockConfig.InsertColumns.Returns((Expression<Func<WorkOrderTestEntity, object>>?)null);
|
||||
configRegistry.GetConfiguration<WorkOrderTestEntity>().Returns(mockConfig);
|
||||
|
||||
// Setup BulkMergeHelper to return results based on actual data processing
|
||||
bulkMergeHelper.MergeAsync(
|
||||
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
||||
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
||||
Arg.Any<Expression<Func<WorkOrderTestEntity, WorkOrderTestEntity, bool>>>(),
|
||||
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(async callInfo =>
|
||||
{
|
||||
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
|
||||
var records = await data.ToListAsync();
|
||||
|
||||
// Actually perform the merge against the real database using Dapper
|
||||
foreach (var record in records)
|
||||
{
|
||||
var exists = await _connection.ExecuteScalarAsync<bool>(
|
||||
"SELECT CASE WHEN EXISTS (SELECT 1 FROM WorkOrder_Test WHERE OrderNumber = @OrderNumber) THEN 1 ELSE 0 END",
|
||||
new { record.OrderNumber });
|
||||
|
||||
if (exists)
|
||||
{
|
||||
await _connection.ExecuteAsync(@"
|
||||
UPDATE WorkOrder_Test
|
||||
SET Status = @Status, Description = @Description, Quantity = @Quantity, LastUpdateDT = @LastUpdateDT
|
||||
WHERE OrderNumber = @OrderNumber",
|
||||
record);
|
||||
}
|
||||
else
|
||||
{
|
||||
await _connection.ExecuteAsync(@"
|
||||
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
||||
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
|
||||
record);
|
||||
}
|
||||
}
|
||||
|
||||
return new MergeResult(
|
||||
records.Count,
|
||||
records.Count,
|
||||
0,
|
||||
0,
|
||||
TimeSpan.Zero);
|
||||
});
|
||||
|
||||
bulkMergeHelper.MassInsertAsync(
|
||||
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<bool>(),
|
||||
Arg.Any<int>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(async callInfo =>
|
||||
{
|
||||
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
|
||||
var records = await data.ToListAsync();
|
||||
|
||||
// Truncate and insert into real database
|
||||
await _connection.ExecuteAsync("TRUNCATE TABLE WorkOrder_Test");
|
||||
foreach (var record in records)
|
||||
{
|
||||
await _connection.ExecuteAsync(@"
|
||||
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
||||
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
|
||||
record);
|
||||
}
|
||||
|
||||
return new MassInsertResult(records.Count, TimeSpan.Zero, true);
|
||||
});
|
||||
|
||||
var options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions
|
||||
{
|
||||
BatchSize = 1000,
|
||||
BulkCopyBatchSize = 100
|
||||
});
|
||||
|
||||
var meterFactory = provider.GetRequiredService<IMeterFactory>();
|
||||
var metrics = new DataSyncMetrics(meterFactory);
|
||||
|
||||
// Create a service provider that can resolve our test fetcher
|
||||
var testServiceProvider = Substitute.For<IServiceProvider>();
|
||||
testServiceProvider.GetService(typeof(TestFetcher)).Returns(fetcher);
|
||||
testServiceProvider.GetService(typeof(FailingFetcher)).Returns(fetcher);
|
||||
|
||||
return new TableSyncOperation(
|
||||
testServiceProvider,
|
||||
connectionFactory,
|
||||
updateRepository,
|
||||
bulkMergeHelper,
|
||||
configRegistry,
|
||||
options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
metrics);
|
||||
}
|
||||
|
||||
private static DataUpdateTask CreateTask(
|
||||
string tableName,
|
||||
UpdateTypes updateType,
|
||||
DateTime? minimumDt = null,
|
||||
bool prepurge = false,
|
||||
string? fetcherTypeName = null)
|
||||
{
|
||||
return new DataUpdateTask
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
UpdateType = updateType,
|
||||
MinimumDt = minimumDt,
|
||||
Config = new DataSourceConfig
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
FetcherTypeName = fetcherTypeName ?? nameof(TestFetcher),
|
||||
IsEnabled = true,
|
||||
MassConfig = new ScheduleConfig
|
||||
{
|
||||
Enabled = true,
|
||||
IntervalMinutes = 10080,
|
||||
PrepurgeData = prepurge,
|
||||
ReIndexData = false
|
||||
},
|
||||
DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 },
|
||||
HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 }
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async Task InsertWorkOrdersDirectly(List<WorkOrderTestEntity> records)
|
||||
{
|
||||
const string sql = @"
|
||||
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
|
||||
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)";
|
||||
|
||||
await _connection.ExecuteAsync(sql, records);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
|
||||
#region Test Fetchers
|
||||
|
||||
/// <summary>
|
||||
/// Test fetcher that yields records from an in-memory list.
|
||||
/// </summary>
|
||||
public class TestFetcher : IDataFetcher<WorkOrderTestEntity>
|
||||
{
|
||||
private readonly List<WorkOrderTestEntity> _records;
|
||||
|
||||
public TestFetcher(List<WorkOrderTestEntity> records)
|
||||
{
|
||||
_records = records;
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
|
||||
DateTime? minimumDt,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
foreach (var record in _records)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
yield break;
|
||||
|
||||
await Task.Yield(); // Simulate async behavior
|
||||
yield return record;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test fetcher that fails after yielding a specified number of records.
|
||||
/// </summary>
|
||||
public class FailingFetcher : IDataFetcher<WorkOrderTestEntity>
|
||||
{
|
||||
private readonly int _failAfterCount;
|
||||
|
||||
public FailingFetcher(int failAfterCount)
|
||||
{
|
||||
_failAfterCount = failAfterCount;
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
|
||||
DateTime? minimumDt,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
for (var i = 0; i < _failAfterCount; i++)
|
||||
{
|
||||
await Task.Yield();
|
||||
yield return new WorkOrderTestEntity
|
||||
{
|
||||
OrderNumber = i + 1,
|
||||
Status = "Test",
|
||||
Description = $"Record {i + 1}",
|
||||
Quantity = i * 10,
|
||||
LastUpdateDT = DateTime.UtcNow
|
||||
};
|
||||
}
|
||||
|
||||
throw new InvalidOperationException("Simulated fetcher failure");
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
Reference in New Issue
Block a user