diff --git a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs
index f2e056e..67422cd 100644
--- a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs
+++ b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs
@@ -1,29 +1,22 @@
using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Interfaces;
-using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Telemetry;
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
///
-/// Executes a single table sync operation.
+/// Executes a single table sync operation using the ETL pipeline.
///
public class TableSyncOperation : ITableSyncOperation
{
- private readonly IServiceProvider _serviceProvider;
- private readonly IDbConnectionFactory _connectionFactory;
+ private readonly IEtlPipelineFactory _pipelineFactory;
private readonly IDataUpdateRepository _updateRepository;
- private readonly IBulkMergeHelper _bulkMergeHelper;
- private readonly IMergeConfigurationRegistry _configRegistry;
private readonly IOptions _options;
private readonly ILogger _logger;
private readonly DataSyncMetrics _metrics;
@@ -32,20 +25,14 @@ public class TableSyncOperation : ITableSyncOperation
/// Initializes a new instance of the class.
///
public TableSyncOperation(
- IServiceProvider serviceProvider,
- IDbConnectionFactory connectionFactory,
+ IEtlPipelineFactory pipelineFactory,
IDataUpdateRepository updateRepository,
- IBulkMergeHelper bulkMergeHelper,
- IMergeConfigurationRegistry configRegistry,
IOptions options,
ILogger logger,
DataSyncMetrics metrics)
{
- _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
- _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
+ _pipelineFactory = pipelineFactory ?? throw new ArgumentNullException(nameof(pipelineFactory));
_updateRepository = updateRepository ?? throw new ArgumentNullException(nameof(updateRepository));
- _bulkMergeHelper = bulkMergeHelper ?? throw new ArgumentNullException(nameof(bulkMergeHelper));
- _configRegistry = configRegistry ?? throw new ArgumentNullException(nameof(configRegistry));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
@@ -142,274 +129,31 @@ public class TableSyncOperation : ITableSyncOperation
}
///
- /// Core sync logic that handles mass vs incremental updates.
+ /// Core sync logic that uses the ETL pipeline.
///
private async Task ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
- // Get the fetcher for this entity type
- var fetcherType = ResolveFetcherType(task.Config.FetcherTypeName);
- var fetcher = _serviceProvider.GetRequiredService(fetcherType);
+ // Determine sync mode based on update type
+ var syncMode = task.UpdateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
- // Use reflection to call FetchAsync on the fetcher
- var fetchMethod = fetcher.GetType().GetMethod("FetchAsync")
- ?? throw new InvalidOperationException($"FetchAsync method not found on {fetcher.GetType().Name}");
+ _logger.LogDebug("Building pipeline for {Table} in {Mode} mode", task.TableName, syncMode);
- var asyncEnumerable = fetchMethod.Invoke(fetcher, [task.MinimumDt, cancellationToken]);
+ // Build and execute the pipeline
+ var pipeline = _pipelineFactory
+ .ForTable(task.TableName)
+ .WithMode(syncMode)
+ .WithMinimumDate(task.MinimumDt)
+ .Build();
- // Get the element type for typed operations
- var (_, elementType) = FindGetAsyncEnumerator(asyncEnumerable!.GetType());
- if (elementType == null || !elementType.IsClass)
+ var result = await pipeline.ExecuteAsync(cancellationToken);
+
+ if (!result.Success)
{
throw new InvalidOperationException(
- $"Fetcher element type must be a class: {asyncEnumerable.GetType().Name}");
+ $"Pipeline failed for {task.TableName}: {result.Error?.Message ?? "Unknown error"}",
+ result.Error);
}
- // Handle mass update with truncation
- if (task.UpdateType == UpdateTypes.Mass && task.ScheduleConfig.PrepurgeData)
- {
- return await ExecuteMassUpdateAsync(
- asyncEnumerable!,
- task,
- elementType,
- cancellationToken);
- }
-
- // Handle incremental update with merge
- return await ExecuteIncrementalUpdateAsync(
- asyncEnumerable!,
- task,
- elementType,
- cancellationToken);
- }
-
- ///
- /// Executes mass update using BulkMergeHelper.
- ///
- private Task ExecuteMassUpdateAsync(
- object asyncEnumerable,
- DataUpdateTask task,
- Type elementType,
- CancellationToken cancellationToken)
- {
- _logger.LogDebug("Executing mass update for {Table}", task.TableName);
-
- // Use typed helper to call MassInsertAsync with correct type parameter
- var helper = typeof(TableSyncOperation)
- .GetMethod(nameof(ExecuteMassUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
- .MakeGenericMethod(elementType);
-
- return (Task)helper.Invoke(this, [
- asyncEnumerable,
- task,
- cancellationToken
- ])!;
- }
-
- ///
- /// Typed helper for mass update.
- ///
- private async Task ExecuteMassUpdateTypedAsync(
- IAsyncEnumerable data,
- DataUpdateTask task,
- CancellationToken cancellationToken) where T : class
- {
- var config = _configRegistry.GetConfiguration();
-
- var result = await _bulkMergeHelper.MassInsertAsync(
- data,
- config.TableName,
- rebuildIndexes: task.ScheduleConfig.ReIndexData,
- batchSize: _options.Value.BulkCopyBatchSize,
- cancellationToken: cancellationToken);
-
- // Run post processor if configured
- await RunPostProcessorAsync(task, cancellationToken);
-
- return result.TotalRowsInserted;
- }
-
- ///
- /// Executes incremental update using BulkMergeHelper.MergeAsync.
- ///
- private Task ExecuteIncrementalUpdateAsync(
- object asyncEnumerable,
- DataUpdateTask task,
- Type elementType,
- CancellationToken cancellationToken)
- {
- _logger.LogDebug("Executing incremental update for {Table}", task.TableName);
-
- // Use typed helper to call MergeAsync with correct type parameter
- var helper = typeof(TableSyncOperation)
- .GetMethod(nameof(ExecuteIncrementalUpdateTypedAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
- .MakeGenericMethod(elementType);
-
- return (Task)helper.Invoke(this, [
- asyncEnumerable,
- task,
- cancellationToken
- ])!;
- }
-
- ///
- /// Typed helper for incremental update.
- ///
- private async Task ExecuteIncrementalUpdateTypedAsync(
- IAsyncEnumerable data,
- DataUpdateTask task,
- CancellationToken cancellationToken) where T : class
- {
- var config = _configRegistry.GetConfiguration();
-
- var result = await _bulkMergeHelper.MergeAsync(
- data,
- config.TableName,
- config.MatchOn,
- config.UpdateColumns,
- config.UpdateWhen,
- config.InsertColumns,
- batchSize: _options.Value.BatchSize,
- cancellationToken: cancellationToken);
-
- // Run post processor if configured
- await RunPostProcessorAsync(task, cancellationToken);
-
- return result.TotalRowsProcessed;
- }
-
- ///
- /// Runs the post processor if configured.
- ///
- private async Task RunPostProcessorAsync(DataUpdateTask task, CancellationToken cancellationToken)
- {
- if (string.IsNullOrEmpty(task.Config.PostProcessorTypeName))
- {
- return;
- }
-
- _logger.LogDebug("Running post processor for {Table}", task.TableName);
-
- var postProcessorType = ResolvePostProcessorType(task.Config.PostProcessorTypeName);
- var postProcessor = (IPostProcessor)_serviceProvider.GetRequiredService(postProcessorType);
-
- await postProcessor.ProcessAsync(task.TableName, cancellationToken);
- }
-
- ///
- /// Resolves the fetcher type from the type name.
- ///
- private static Type ResolveFetcherType(string fetcherTypeName)
- {
- // Look for the type in the DataSync assembly and related assemblies
- var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
- .SelectMany(a =>
- {
- try { return a.GetTypes(); }
- catch { return []; }
- })
- .Where(t => t.Name == fetcherTypeName || t.FullName == fetcherTypeName)
- .ToList();
-
- if (candidateTypes.Count == 0)
- {
- throw new InvalidOperationException($"Fetcher type '{fetcherTypeName}' not found");
- }
-
- if (candidateTypes.Count > 1)
- {
- throw new InvalidOperationException(
- $"Multiple types found matching '{fetcherTypeName}': {string.Join(", ", candidateTypes.Select(t => t.FullName))}");
- }
-
- return candidateTypes[0];
- }
-
- ///
- /// Resolves the post processor type from the type name.
- ///
- private static Type ResolvePostProcessorType(string postProcessorTypeName)
- {
- var candidateTypes = AppDomain.CurrentDomain.GetAssemblies()
- .SelectMany(a =>
- {
- try { return a.GetTypes(); }
- catch { return []; }
- })
- .Where(t => t.Name == postProcessorTypeName || t.FullName == postProcessorTypeName)
- .Where(t => typeof(IPostProcessor).IsAssignableFrom(t))
- .ToList();
-
- if (candidateTypes.Count == 0)
- {
- throw new InvalidOperationException($"PostProcessor type '{postProcessorTypeName}' not found");
- }
-
- return candidateTypes[0];
- }
-
- ///
- /// Enumerates an async enumerable using reflection.
- /// Resolves GetAsyncEnumerator from IAsyncEnumerable interface to handle explicit implementations.
- ///
- private static async IAsyncEnumerable