feat(datasync): wire TableSyncOperation to use EtlPipelineFactory

Replace the old sync logic (fetchers, merge configurations, bulk merge
helper, post processors) with the new ETL pipeline factory.

Changes:
- Inject IEtlPipelineFactory instead of old dependencies
- Remove IServiceProvider, IDbConnectionFactory, IBulkMergeHelper,
  IMergeConfigurationRegistry dependencies
- Simplify ExecuteSyncCoreAsync to build and execute pipeline
- Keep DataUpdateRepository calls for tracking sync timestamps
- Determine SyncMode from UpdateType (Mass vs Incremental)
This commit is contained in:
Joseph Doherty
2026-01-06 14:05:56 -05:00
parent 01da261d6c
commit 981c410cb3
@@ -1,29 +1,22 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Interfaces;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Executes a single table sync operation.
/// Executes a single table sync operation using the ETL pipeline.
/// </summary>
public class TableSyncOperation : ITableSyncOperation
{
private readonly IServiceProvider _serviceProvider;
private readonly IDbConnectionFactory _connectionFactory;
private readonly IEtlPipelineFactory _pipelineFactory;
private readonly IDataUpdateRepository _updateRepository;
private readonly IBulkMergeHelper _bulkMergeHelper;
private readonly IMergeConfigurationRegistry _configRegistry;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<TableSyncOperation> _logger;
private readonly DataSyncMetrics _metrics;
@@ -32,20 +25,14 @@ public class TableSyncOperation : ITableSyncOperation
/// Initializes a new instance of the <see cref="TableSyncOperation"/> class.
/// </summary>
public TableSyncOperation(
IServiceProvider serviceProvider,
IDbConnectionFactory connectionFactory,
IEtlPipelineFactory pipelineFactory,
IDataUpdateRepository updateRepository,
IBulkMergeHelper bulkMergeHelper,
IMergeConfigurationRegistry configRegistry,
IOptions<DataSyncOptions> options,
ILogger<TableSyncOperation> logger,
DataSyncMetrics metrics)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_pipelineFactory = pipelineFactory ?? throw new ArgumentNullException(nameof(pipelineFactory));
_updateRepository = updateRepository ?? throw new ArgumentNullException(nameof(updateRepository));
_bulkMergeHelper = bulkMergeHelper ?? throw new ArgumentNullException(nameof(bulkMergeHelper));
_configRegistry = configRegistry ?? throw new ArgumentNullException(nameof(configRegistry));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
@@ -142,274 +129,31 @@ public class TableSyncOperation : ITableSyncOperation
}
/// <summary>
/// Core sync logic that handles mass vs incremental updates.
/// Core sync logic that uses the ETL pipeline.
/// </summary>
private async Task<long> ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
// Get the fetcher for this entity type
var fetcherType = ResolveFetcherType(task.Config.FetcherTypeName);
var fetcher = _serviceProvider.GetRequiredService(fetcherType);
// Determine sync mode based on update type
var syncMode = task.UpdateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
// Use reflection to call FetchAsync on the fetcher
var fetchMethod = fetcher.GetType().GetMethod("FetchAsync")
?? throw new InvalidOperationException($"FetchAsync method not found on {fetcher.GetType().Name}");
_logger.LogDebug("Building pipeline for {Table} in {Mode} mode", task.TableName, syncMode);
var asyncEnumerable = fetchMethod.Invoke(fetcher, [task.MinimumDt, cancellationToken]);
// Build and execute the pipeline
var pipeline = _pipelineFactory
.ForTable(task.TableName)
.WithMode(syncMode)
.WithMinimumDate(task.MinimumDt)
.Build();
// Get the element type for typed operations
var (_, elementType) = FindGetAsyncEnumerator(asyncEnumerable!.GetType());
if (elementType == null || !elementType.IsClass)
var result = await pipeline.ExecuteAsync(cancellationToken);
if (!result.Success)
{
throw new InvalidOperationException(
$"Fetcher element type must be a class: {asyncEnumerable.GetType().Name}");
$"Pipeline failed for {task.TableName}: {result.Error?.Message ?? "Unknown error"}",
result.Error);
}
// Handle mass update with truncation
if (task.UpdateType == UpdateTypes.Mass && task.ScheduleConfig.PrepurgeData)
{
return await ExecuteMassUpdateAsync(
asyncEnumerable!,
task,
elementType,
cancellationToken);
}
// Handle incremental update with merge
return await ExecuteIncrementalUpdateAsync(
asyncEnumerable!,
task,
elementType,
cancellationToken);
}
/// <summary>
/// Executes mass update using BulkMergeHelper.
/// </summary>
private Task<long> ExecuteMassUpdateAsync(
object asyncEnumerable,
DataUpdateTask task,
Type elementType,
CancellationToken cancellationToken)
{
_logger.LogDebug("Executing mass update for {Table}", task.TableName);
// Use typed helper to call MassInsertAsync with correct type parameter
var helper = typeof(TableSyncOperation)
.GetMethod(nameof(ExecuteMassUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
.MakeGenericMethod(elementType);
return (Task<long>)helper.Invoke(this, [
asyncEnumerable,
task,
cancellationToken
])!;
}
/// <summary>
/// Typed helper for mass update.
/// </summary>
private async Task<long> ExecuteMassUpdateTypedAsync<T>(
IAsyncEnumerable<T> data,
DataUpdateTask task,
CancellationToken cancellationToken) where T : class
{
var config = _configRegistry.GetConfiguration<T>();
var result = await _bulkMergeHelper.MassInsertAsync(
data,
config.TableName,
rebuildIndexes: task.ScheduleConfig.ReIndexData,
batchSize: _options.Value.BulkCopyBatchSize,
cancellationToken: cancellationToken);
// Run post processor if configured
await RunPostProcessorAsync(task, cancellationToken);
return result.TotalRowsInserted;
}
/// <summary>
/// Executes incremental update using BulkMergeHelper.MergeAsync.
/// </summary>
private Task<long> ExecuteIncrementalUpdateAsync(
object asyncEnumerable,
DataUpdateTask task,
Type elementType,
CancellationToken cancellationToken)
{
_logger.LogDebug("Executing incremental update for {Table}", task.TableName);
// Use typed helper to call MergeAsync with correct type parameter
var helper = typeof(TableSyncOperation)
.GetMethod(nameof(ExecuteIncrementalUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
.MakeGenericMethod(elementType);
return (Task<long>)helper.Invoke(this, [
asyncEnumerable,
task,
cancellationToken
])!;
}
/// <summary>
/// Typed helper for incremental update.
/// </summary>
private async Task<long> ExecuteIncrementalUpdateTypedAsync<T>(
IAsyncEnumerable<T> data,
DataUpdateTask task,
CancellationToken cancellationToken) where T : class
{
var config = _configRegistry.GetConfiguration<T>();
var result = await _bulkMergeHelper.MergeAsync(
data,
config.TableName,
config.MatchOn,
config.UpdateColumns,
config.UpdateWhen,
config.InsertColumns,
batchSize: _options.Value.BatchSize,
cancellationToken: cancellationToken);
// Run post processor if configured
await RunPostProcessorAsync(task, cancellationToken);
return result.TotalRowsProcessed;
}
/// <summary>
/// Runs the post processor if configured.
/// </summary>
private async Task RunPostProcessorAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(task.Config.PostProcessorTypeName))
{
return;
}
_logger.LogDebug("Running post processor for {Table}", task.TableName);
var postProcessorType = ResolvePostProcessorType(task.Config.PostProcessorTypeName);
var postProcessor = (IPostProcessor)_serviceProvider.GetRequiredService(postProcessorType);
await postProcessor.ProcessAsync(task.TableName, cancellationToken);
}
/// <summary>
/// Resolves the fetcher type from the type name.
/// </summary>
private static Type ResolveFetcherType(string fetcherTypeName)
{
// Look for the type in the DataSync assembly and related assemblies
var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a =>
{
try { return a.GetTypes(); }
catch { return []; }
})
.Where(t => t.Name == fetcherTypeName || t.FullName == fetcherTypeName)
.ToList();
if (candidateTypes.Count == 0)
{
throw new InvalidOperationException($"Fetcher type '{fetcherTypeName}' not found");
}
if (candidateTypes.Count > 1)
{
throw new InvalidOperationException(
$"Multiple types found matching '{fetcherTypeName}': {string.Join(", ", candidateTypes.Select(t => t.FullName))}");
}
return candidateTypes[0];
}
/// <summary>
/// Resolves the post processor type from the type name.
/// </summary>
private static Type ResolvePostProcessorType(string postProcessorTypeName)
{
var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a =>
{
try { return a.GetTypes(); }
catch { return []; }
})
.Where(t => t.Name == postProcessorTypeName || t.FullName == postProcessorTypeName)
.Where(t => typeof(IPostProcessor).IsAssignableFrom(t))
.ToList();
if (candidateTypes.Count == 0)
{
throw new InvalidOperationException($"PostProcessor type '{postProcessorTypeName}' not found");
}
return candidateTypes[0];
}
/// <summary>
/// Enumerates an async enumerable using reflection.
/// Resolves GetAsyncEnumerator from IAsyncEnumerable interface to handle explicit implementations.
/// </summary>
private static async IAsyncEnumerable<object> EnumerateAsync(
object asyncEnumerable,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Find the IAsyncEnumerable<T> interface and get element type
var (getAsyncEnumeratorMethod, elementType) = FindGetAsyncEnumerator(asyncEnumerable.GetType());
if (getAsyncEnumeratorMethod == null || elementType == null)
{
throw new InvalidOperationException(
$"Type {asyncEnumerable.GetType().Name} does not implement IAsyncEnumerable<T>");
}
// Use generic helper to avoid reflection on MoveNextAsync/Current
var helperMethod = typeof(TableSyncOperation)
.GetMethod(nameof(EnumerateAsyncGeneric), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static)!
.MakeGenericMethod(elementType);
var result = (IAsyncEnumerable<object>)helperMethod.Invoke(null, [asyncEnumerable, cancellationToken])!;
await foreach (var item in result.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item;
}
}
/// <summary>
/// Finds the GetAsyncEnumerator method from IAsyncEnumerable interface.
/// </summary>
private static (System.Reflection.MethodInfo? Method, Type? ElementType) FindGetAsyncEnumerator(Type type)
{
var iface = type.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>));
if (iface == null)
{
return (null, null);
}
var elementType = iface.GetGenericArguments()[0];
// Get the GetAsyncEnumerator method with CancellationToken parameter
var method = iface.GetMethod("GetAsyncEnumerator", [typeof(CancellationToken)])
?? iface.GetMethod("GetAsyncEnumerator", Type.EmptyTypes);
return (method, elementType);
}
/// <summary>
/// Generic helper to enumerate IAsyncEnumerable without reflection on enumerator methods.
/// </summary>
private static async IAsyncEnumerable<object> EnumerateAsyncGeneric<T>(
IAsyncEnumerable<T> source,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item!;
}
return result.TotalRows;
}
}