feat: implement ETL pipeline redesign and ConfigManager improvements

- Add pipeline registry with JSON-based configuration and hot-reload support
- Implement manual sync request feature with API, client UI, and database
- Improve ConfigManager: connection string dropdown in pipeline editor,
  step delete/reorder functionality, and fix JSON parsing for ConnectionStrings
This commit is contained in:
Joseph Doherty
2026-01-22 17:48:33 -05:00
parent 5a332232d0
commit 29ac56006d
82 changed files with 6257 additions and 296 deletions
@@ -0,0 +1,22 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for the pipeline destination.
/// </summary>
public class DestinationElement
{
/// <summary>
/// Target table name in the cache database.
/// </summary>
public string Table { get; set; } = string.Empty;
/// <summary>
/// Columns used to match existing records for upsert.
/// </summary>
public List<string> MatchColumns { get; set; } = [];
/// <summary>
/// Columns to exclude from UPDATE operations.
/// </summary>
public List<string> ExcludeFromUpdate { get; set; } = [];
}
@@ -0,0 +1,78 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Represents an ETL pipeline definition loaded from a JSON file.
/// </summary>
public class EtlPipelineConfig
{
/// <summary>
/// Unique name of the pipeline (must match filename, case-insensitive).
/// </summary>
public string Name { get; set; } = string.Empty;
/// <summary>
/// Whether the pipeline is enabled for execution.
/// </summary>
public bool IsEnabled { get; set; } = true;
/// <summary>
/// If true, pipeline can only be triggered via ManualSyncRequest.
/// No interval validation is required for manual-only pipelines.
/// </summary>
public bool IsManualOnly { get; set; }
/// <summary>
/// Interval for mass sync in minutes. Null if mass sync not supported.
/// </summary>
public int? MassSyncIntervalMinutes { get; set; }
/// <summary>
/// Interval for daily sync in minutes. Null if daily sync not supported.
/// </summary>
public int? DailySyncIntervalMinutes { get; set; }
/// <summary>
/// Interval for hourly sync in minutes. Null if hourly sync not supported.
/// </summary>
public int? HourlySyncIntervalMinutes { get; set; }
/// <summary>
/// Scripts to run before the main sync. Optional.
/// </summary>
public List<ScriptElement> PreScripts { get; set; } = [];
/// <summary>
/// The data source configuration. Required.
/// </summary>
public SourceElement Source { get; set; } = null!;
/// <summary>
/// Data transformations to apply. Optional.
/// </summary>
public List<TransformElement> Transforms { get; set; } = [];
/// <summary>
/// The destination configuration. Required.
/// </summary>
public DestinationElement Destination { get; set; } = null!;
/// <summary>
/// Scripts to run after the main sync. Optional.
/// </summary>
public List<ScriptElement> PostScripts { get; set; } = [];
/// <summary>
/// Gets a value indicating whether the pipeline supports mass sync.
/// </summary>
public bool SupportsMassSync => MassSyncIntervalMinutes.HasValue;
/// <summary>
/// Gets a value indicating whether the pipeline supports daily sync.
/// </summary>
public bool SupportsDailySync => DailySyncIntervalMinutes.HasValue;
/// <summary>
/// Gets a value indicating whether the pipeline supports hourly sync.
/// </summary>
public bool SupportsHourlySync => HourlySyncIntervalMinutes.HasValue;
}
@@ -0,0 +1,27 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a query parameter.
/// </summary>
public class ParameterElement
{
/// <summary>
/// Parameter name as used in query (e.g., ":dateUpdated").
/// </summary>
public string Name { get; set; } = string.Empty;
/// <summary>
/// Format conversion (jdeJulian, jdeTime, etc.).
/// </summary>
public string? Format { get; set; }
/// <summary>
/// Source of the value (offset = from last sync time).
/// </summary>
public string Source { get; set; } = "offset";
/// <summary>
/// Static value if source is not offset.
/// </summary>
public string? Value { get; set; }
}
@@ -0,0 +1,17 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a pre/post script.
/// </summary>
public class ScriptElement
{
/// <summary>
/// Connection identifier for script execution.
/// </summary>
public string Connection { get; set; } = "lotfinder";
/// <summary>
/// SQL script to execute.
/// </summary>
public string Script { get; set; } = string.Empty;
}
@@ -0,0 +1,27 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for the pipeline data source.
/// </summary>
public class SourceElement
{
/// <summary>
/// Connection identifier (jde, cms, giw, lotfinder).
/// </summary>
public string Connection { get; set; } = string.Empty;
/// <summary>
/// Query for incremental syncs (daily/hourly).
/// </summary>
public string Query { get; set; } = string.Empty;
/// <summary>
/// Query for mass sync. Falls back to Query if not specified.
/// </summary>
public string? MassQuery { get; set; }
/// <summary>
/// Query parameters with format and source configuration.
/// </summary>
public Dictionary<string, ParameterElement> Parameters { get; set; } = new();
}
@@ -0,0 +1,21 @@
using System.Text.Json;
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a data transformation.
/// </summary>
public class TransformElement
{
/// <summary>
/// Type of transformation (ColumnDrop, ColumnRename, JdeDate, Regex, etc.).
/// </summary>
public string TransformType { get; set; } = string.Empty;
/// <summary>
/// Transform-specific configuration as raw JSON.
/// Using JsonElement avoids Dictionary&lt;string, object&gt; deserialization issues
/// where values would become JsonElement anyway without custom converters.
/// </summary>
public JsonElement? Config { get; set; }
}
@@ -1,3 +1,5 @@
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
@@ -11,4 +13,12 @@ public interface ISyncOrchestrator
/// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
/// <returns>A task representing the async operation.</returns>
Task ExecutePendingSyncsAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Executes a single sync operation (used for manual sync requests).
/// </summary>
/// <param name="task">The sync task to execute.</param>
/// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
/// <returns>A task representing the async operation.</returns>
Task ExecuteSingleSyncAsync(DataUpdateTask task, CancellationToken cancellationToken = default);
}
@@ -42,6 +42,13 @@ public static class DataSyncDependencyInjection
// Pipeline factory (new ETL infrastructure)
services.AddSingleton<IEtlPipelineFactory, EtlPipelineFactory>();
// Pipeline registry services (new hot-reload infrastructure)
services.AddSingleton<IPipelineValidator, PipelineValidator>();
services.AddSingleton<IPipelineRegistry, PipelineRegistry>();
// Pipeline registry initializer - runs before WorkProcessor to ensure pipelines load first
services.AddHostedService<PipelineRegistryInitializer>();
// Register hosted service (WorkProcessor combines data sync and search processing)
services.AddHostedService<WorkProcessor>();
@@ -1,4 +1,5 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Options;
namespace JdeScoping.DataSync.Models;
@@ -19,7 +20,7 @@ public class DataUpdateTask
public required string TableName { get; init; }
/// <summary>
/// Source system: "JDE" or "CMS".
/// Source system: "JDE", "CMS", "GIW", or "LOTFINDER".
/// </summary>
public required string SourceSystem { get; init; }
@@ -40,9 +41,15 @@ public class DataUpdateTask
public DateTime? MinimumDt { get; init; }
/// <summary>
/// The data source configuration for this task.
/// The pipeline configuration for this task (new format).
/// </summary>
public required DataSourceConfig Config { get; init; }
public EtlPipelineConfig? Pipeline { get; init; }
/// <summary>
/// The data source configuration for this task (legacy format - will be removed).
/// </summary>
[Obsolete("Use Pipeline instead. This property exists for backward compatibility during migration.")]
public DataSourceConfig? Config { get; init; }
/// <summary>
/// Gets a unique key for logging purposes.
@@ -50,13 +57,47 @@ public class DataUpdateTask
public string LogKey => $"{TableName}_{UpdateType}_{OperationId:N}";
/// <summary>
/// Gets the schedule configuration for this update type.
/// Gets the interval in minutes for this update type from the pipeline config.
/// </summary>
public ScheduleConfig ScheduleConfig => UpdateType switch
public int? IntervalMinutes => Pipeline != null ? UpdateType switch
{
UpdateTypes.Mass => Config.MassConfig,
UpdateTypes.Daily => Config.DailyConfig,
UpdateTypes.Hourly => Config.HourlyConfig,
_ => throw new ArgumentOutOfRangeException(nameof(UpdateType))
};
UpdateTypes.Mass => Pipeline.MassSyncIntervalMinutes,
UpdateTypes.Daily => Pipeline.DailySyncIntervalMinutes,
UpdateTypes.Hourly => Pipeline.HourlySyncIntervalMinutes,
_ => null
} : null;
/// <summary>
/// Creates a DataUpdateTask from an EtlPipelineConfig.
/// </summary>
public static DataUpdateTask FromPipeline(
EtlPipelineConfig pipeline,
UpdateTypes updateType,
DateTime? minimumDt = null)
{
return new DataUpdateTask
{
TableName = pipeline.Destination.Table,
SourceSystem = MapConnectionToSourceSystem(pipeline.Source.Connection),
SourceData = pipeline.Name,
UpdateType = updateType,
MinimumDt = minimumDt,
Pipeline = pipeline
};
}
/// <summary>
/// Maps connection identifier to source system name.
/// </summary>
private static string MapConnectionToSourceSystem(string connection)
{
return connection.ToUpperInvariant() switch
{
"JDE" => "JDE",
"CMS" => "CMS",
"GIW" => "GIW",
"LOTFINDER" => "LOTFINDER",
_ => connection.ToUpperInvariant()
};
}
}
@@ -58,6 +58,18 @@ public class DataSyncOptions
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Directory containing pipeline.*.json files.
/// Resolved relative to content root.
/// </summary>
public string PipelinesDirectory { get; set; } = "Pipelines";
/// <summary>
/// If true (default), startup fails if any enabled pipeline is invalid.
/// If false, invalid enabled pipelines are skipped with warnings.
/// </summary>
public bool StrictPipelineValidation { get; set; } = true;
/// <summary>
/// Per-table data source configurations.
/// </summary>
@@ -34,4 +34,12 @@ public class WorkProcessorOptions
/// </summary>
[Range(1, 365)]
public int PurgeRetentionDays { get; set; } = 30;
/// <summary>
/// Maximum manual sync requests to process per work cycle.
/// Prevents manual requests from starving scheduled syncs.
/// Default: 5. Set to 0 for unlimited (not recommended).
/// </summary>
[Range(0, 100)]
public int MaxManualRequestsPerCycle { get; set; } = 5;
}
@@ -0,0 +1,115 @@
using JdeScoping.DataSync.Configuration;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Registry for ETL pipeline definitions with hot reload support.
/// </summary>
public interface IPipelineRegistry
{
/// <summary>
/// Gets all loaded pipelines (immutable snapshot).
/// </summary>
IReadOnlyList<EtlPipelineConfig> GetAllPipelines();
/// <summary>
/// Gets all enabled pipelines (immutable snapshot).
/// </summary>
IReadOnlyList<EtlPipelineConfig> GetEnabledPipelines();
/// <summary>
/// Gets a pipeline by name (case-insensitive).
/// </summary>
/// <param name="name">The pipeline name to find.</param>
/// <returns>The pipeline if found, or null.</returns>
EtlPipelineConfig? GetPipeline(string name);
/// <summary>
/// Validates that a pipeline and sync type combination is valid.
/// </summary>
/// <param name="pipelineName">The pipeline name.</param>
/// <param name="syncType">The sync type (mass, daily, hourly).</param>
/// <returns>True if the combination is valid.</returns>
bool IsValidPipelineAndSyncType(string pipelineName, string syncType);
/// <summary>
/// Reloads all pipelines from disk.
/// Returns validation results for each file.
/// Reload is atomic: on any enabled pipeline failure, keeps previous snapshot.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>The reload result.</returns>
Task<PipelineReloadResult> ReloadAsync(CancellationToken ct = default);
/// <summary>
/// Gets the current registry version (increments on successful reload).
/// </summary>
int Version { get; }
/// <summary>
/// Gets the timestamp of the last successful load.
/// </summary>
DateTime? LastLoadedAt { get; }
}
/// <summary>
/// Result of a pipeline reload operation.
/// </summary>
public class PipelineReloadResult
{
/// <summary>
/// Gets or sets a value indicating whether the reload was successful.
/// </summary>
public bool Success { get; set; }
/// <summary>
/// Gets or sets the number of pipelines loaded.
/// </summary>
public int PipelinesLoaded { get; set; }
/// <summary>
/// Gets or sets the number of pipelines skipped due to errors.
/// </summary>
public int PipelinesSkipped { get; set; }
/// <summary>
/// Gets or sets the previous version before reload.
/// </summary>
public int PreviousVersion { get; set; }
/// <summary>
/// Gets or sets the new version after reload (unchanged if reload failed).
/// </summary>
public int NewVersion { get; set; }
/// <summary>
/// Gets or sets the list of validation errors.
/// </summary>
public List<PipelineLoadError> Errors { get; set; } = [];
}
/// <summary>
/// Represents an error that occurred while loading a pipeline.
/// </summary>
public class PipelineLoadError
{
/// <summary>
/// Gets or sets the file name that had the error.
/// </summary>
public string FileName { get; set; } = string.Empty;
/// <summary>
/// Gets or sets the pipeline name (if parseable).
/// </summary>
public string PipelineName { get; set; } = string.Empty;
/// <summary>
/// Gets or sets the error type (parse, validation, file).
/// </summary>
public string ErrorType { get; set; } = string.Empty;
/// <summary>
/// Gets or sets the error messages.
/// </summary>
public List<string> Messages { get; set; } = [];
}
@@ -0,0 +1,38 @@
using JdeScoping.DataSync.Configuration;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Validates pipeline definitions.
/// </summary>
public interface IPipelineValidator
{
/// <summary>
/// Validates a pipeline definition.
/// </summary>
/// <param name="pipeline">The pipeline to validate.</param>
/// <param name="fileName">The source file name for error messages.</param>
/// <returns>Validation result with errors and warnings.</returns>
PipelineValidationResult Validate(EtlPipelineConfig pipeline, string fileName);
}
/// <summary>
/// Result of validating a single pipeline.
/// </summary>
public class PipelineValidationResult
{
/// <summary>
/// Gets or sets a value indicating whether the pipeline is valid.
/// </summary>
public bool IsValid { get; set; }
/// <summary>
/// Gets or sets the list of validation errors.
/// </summary>
public List<string> Errors { get; set; } = [];
/// <summary>
/// Gets or sets the list of validation warnings.
/// </summary>
public List<string> Warnings { get; set; } = [];
}
@@ -0,0 +1,263 @@
using System.Text.Json;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Registry for ETL pipeline definitions with hot reload support.
/// Uses immutable snapshots for thread-safe reads.
/// </summary>
public class PipelineRegistry : IPipelineRegistry
{
private readonly IOptions<DataSyncOptions> _options;
private readonly IPipelineValidator _validator;
private readonly ILogger<PipelineRegistry> _logger;
private readonly IHostEnvironment _environment;
/// <summary>
/// Immutable snapshot - swapped atomically.
/// </summary>
private volatile PipelineSnapshot _snapshot = PipelineSnapshot.Empty;
/// <summary>
/// Serializes reload operations (only one reload at a time).
/// </summary>
private readonly SemaphoreSlim _reloadLock = new(1, 1);
/// <summary>
/// Version incremented via Interlocked.
/// </summary>
private int _version;
private static readonly JsonSerializerOptions PipelineJsonOptions = new()
{
PropertyNameCaseInsensitive = true,
ReadCommentHandling = JsonCommentHandling.Skip,
AllowTrailingCommas = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = true,
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
};
/// <summary>
/// Initializes a new instance of the <see cref="PipelineRegistry"/> class.
/// </summary>
public PipelineRegistry(
IOptions<DataSyncOptions> options,
IPipelineValidator validator,
ILogger<PipelineRegistry> logger,
IHostEnvironment environment)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_validator = validator ?? throw new ArgumentNullException(nameof(validator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
}
/// <inheritdoc/>
public IReadOnlyList<EtlPipelineConfig> GetAllPipelines() => _snapshot.AllPipelines;
/// <inheritdoc/>
public IReadOnlyList<EtlPipelineConfig> GetEnabledPipelines() => _snapshot.EnabledPipelines;
/// <inheritdoc/>
public EtlPipelineConfig? GetPipeline(string name) => _snapshot.GetByName(name);
/// <inheritdoc/>
public int Version => _version;
/// <inheritdoc/>
public DateTime? LastLoadedAt => _snapshot.LoadedAt == default ? null : _snapshot.LoadedAt;
/// <inheritdoc/>
public bool IsValidPipelineAndSyncType(string pipelineName, string syncType)
{
var pipeline = GetPipeline(pipelineName);
if (pipeline == null || !pipeline.IsEnabled)
{
return false;
}
return syncType.ToLowerInvariant() switch
{
"mass" => pipeline.SupportsMassSync,
"daily" => pipeline.SupportsDailySync,
"hourly" => pipeline.SupportsHourlySync,
_ => false
};
}
/// <inheritdoc/>
public async Task<PipelineReloadResult> ReloadAsync(CancellationToken ct = default)
{
await _reloadLock.WaitAsync(ct);
try
{
var newSnapshot = await LoadSnapshotAsync(ct);
var previousVersion = _version;
// Atomic swap only on success
if (newSnapshot.Result.Success)
{
Interlocked.Exchange(ref _snapshot, newSnapshot);
Interlocked.Increment(ref _version);
}
newSnapshot.Result.PreviousVersion = previousVersion;
newSnapshot.Result.NewVersion = _version;
return newSnapshot.Result;
}
finally
{
_reloadLock.Release();
}
}
private async Task<PipelineSnapshot> LoadSnapshotAsync(CancellationToken ct)
{
var result = new PipelineReloadResult();
var pipelines = new List<EtlPipelineConfig>();
var seenNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
// Resolve directory path relative to content root
var directory = Path.Combine(
_environment.ContentRootPath,
_options.Value.PipelinesDirectory);
// Handle missing directory
if (!Directory.Exists(directory))
{
result.Errors.Add(new PipelineLoadError
{
FileName = directory,
ErrorType = "file",
Messages = ["Pipeline directory does not exist"]
});
result.Success = false;
return new PipelineSnapshot([], result);
}
// Load each pipeline file
var files = Directory.GetFiles(directory, "pipeline.*.json");
_logger.LogDebug("Found {Count} pipeline files in {Directory}", files.Length, directory);
foreach (var file in files)
{
ct.ThrowIfCancellationRequested();
var fileName = Path.GetFileName(file);
EtlPipelineConfig? pipeline = null;
// Parse JSON
try
{
var json = await File.ReadAllTextAsync(file, ct);
pipeline = JsonSerializer.Deserialize<EtlPipelineConfig>(json, PipelineJsonOptions);
}
catch (JsonException ex)
{
result.Errors.Add(new PipelineLoadError
{
FileName = fileName,
ErrorType = "parse",
Messages = [$"JSON parse error: {ex.Message}"]
});
result.PipelinesSkipped++;
continue;
}
catch (IOException ex)
{
result.Errors.Add(new PipelineLoadError
{
FileName = fileName,
ErrorType = "file",
Messages = [$"File read error: {ex.Message}"]
});
result.PipelinesSkipped++;
continue;
}
if (pipeline == null)
{
result.Errors.Add(new PipelineLoadError
{
FileName = fileName,
ErrorType = "parse",
Messages = ["Deserialized to null"]
});
result.PipelinesSkipped++;
continue;
}
// Check for duplicate names
if (!seenNames.Add(pipeline.Name))
{
result.Errors.Add(new PipelineLoadError
{
FileName = fileName,
PipelineName = pipeline.Name,
ErrorType = "validation",
Messages = [$"Duplicate pipeline name: {pipeline.Name}"]
});
result.PipelinesSkipped++;
continue;
}
// Validate pipeline
var validation = _validator.Validate(pipeline, fileName);
if (!validation.IsValid)
{
result.Errors.Add(new PipelineLoadError
{
FileName = fileName,
PipelineName = pipeline.Name,
ErrorType = "validation",
Messages = validation.Errors
});
// Only skip if enabled (disabled invalid pipelines are warnings)
if (pipeline.IsEnabled)
{
result.PipelinesSkipped++;
continue;
}
// Disabled invalid pipelines are logged as warnings but added
_logger.LogWarning(
"Disabled pipeline {Name} has validation errors: {Errors}",
pipeline.Name,
string.Join("; ", validation.Errors));
}
// Log warnings
foreach (var warning in validation.Warnings)
{
_logger.LogWarning("Pipeline {Name}: {Warning}", pipeline.Name, warning);
}
pipelines.Add(pipeline);
result.PipelinesLoaded++;
}
// Determine success: all enabled pipelines must be valid
// Check if any errors are for enabled pipelines that were skipped
var hasEnabledErrors = result.Errors.Any(e =>
{
// If this error caused a pipeline to be skipped (not in our list),
// and it was for an enabled pipeline, it's a failure
var matchingPipeline = pipelines.FirstOrDefault(p =>
string.Equals(p.Name, e.PipelineName, StringComparison.OrdinalIgnoreCase));
// If pipeline wasn't added to list, it was skipped (enabled + invalid)
return matchingPipeline == null && !string.IsNullOrEmpty(e.PipelineName);
});
result.Success = !hasEnabledErrors && result.Errors.All(e => e.ErrorType != "file" || e.Messages.All(m => !m.Contains("does not exist")));
return new PipelineSnapshot(pipelines, result);
}
}
@@ -0,0 +1,89 @@
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Initializes the pipeline registry at application startup.
/// Runs as a hosted service to properly handle async loading.
/// </summary>
public class PipelineRegistryInitializer : IHostedService
{
private readonly IPipelineRegistry _registry;
private readonly IOptions<DataSyncOptions> _options;
private readonly IHostApplicationLifetime _lifetime;
private readonly ILogger<PipelineRegistryInitializer> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="PipelineRegistryInitializer"/> class.
/// </summary>
public PipelineRegistryInitializer(
IPipelineRegistry registry,
IOptions<DataSyncOptions> options,
IHostApplicationLifetime lifetime,
ILogger<PipelineRegistryInitializer> logger)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_options = options ?? throw new ArgumentNullException(nameof(options));
_lifetime = lifetime ?? throw new ArgumentNullException(nameof(lifetime));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task StartAsync(CancellationToken ct)
{
_logger.LogInformation("Loading pipeline definitions from {Directory}...",
_options.Value.PipelinesDirectory);
var result = await _registry.ReloadAsync(ct);
if (!result.Success && _options.Value.StrictPipelineValidation)
{
_logger.LogCritical(
"Pipeline validation failed with {ErrorCount} errors. " +
"Application will stop. Set StrictPipelineValidation=false to allow.",
result.Errors.Count);
foreach (var error in result.Errors)
{
_logger.LogError(
"Pipeline {Name} ({File}): [{Type}] {Messages}",
error.PipelineName,
error.FileName,
error.ErrorType,
string.Join("; ", error.Messages));
}
_lifetime.StopApplication();
return;
}
if (result.Errors.Count > 0)
{
_logger.LogWarning(
"Pipeline loading completed with {ErrorCount} warnings",
result.Errors.Count);
foreach (var error in result.Errors)
{
_logger.LogWarning(
"Pipeline {Name} ({File}): [{Type}] {Messages}",
error.PipelineName,
error.FileName,
error.ErrorType,
string.Join("; ", error.Messages));
}
}
_logger.LogInformation(
"Loaded {Count} pipelines ({Enabled} enabled) - version {Version}",
result.PipelinesLoaded,
_registry.GetEnabledPipelines().Count,
result.NewVersion);
}
/// <inheritdoc/>
public Task StopAsync(CancellationToken ct) => Task.CompletedTask;
}
@@ -0,0 +1,61 @@
using JdeScoping.DataSync.Configuration;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Immutable snapshot of loaded pipelines.
/// </summary>
internal sealed class PipelineSnapshot
{
/// <summary>
/// Empty snapshot with no pipelines.
/// </summary>
public static readonly PipelineSnapshot Empty = new([], new PipelineReloadResult { Success = true });
private readonly Dictionary<string, EtlPipelineConfig> _byName;
/// <summary>
/// Gets all loaded pipelines.
/// </summary>
public IReadOnlyList<EtlPipelineConfig> AllPipelines { get; }
/// <summary>
/// Gets only enabled pipelines.
/// </summary>
public IReadOnlyList<EtlPipelineConfig> EnabledPipelines { get; }
/// <summary>
/// Gets the reload result that created this snapshot.
/// </summary>
public PipelineReloadResult Result { get; }
/// <summary>
/// Gets the timestamp when this snapshot was loaded.
/// </summary>
public DateTime LoadedAt { get; }
/// <summary>
/// Initializes a new instance of the <see cref="PipelineSnapshot"/> class.
/// </summary>
/// <param name="pipelines">The list of pipelines to include.</param>
/// <param name="result">The reload result.</param>
public PipelineSnapshot(IReadOnlyList<EtlPipelineConfig> pipelines, PipelineReloadResult result)
{
AllPipelines = pipelines;
EnabledPipelines = pipelines.Where(p => p.IsEnabled).ToList();
Result = result;
LoadedAt = DateTime.UtcNow;
_byName = pipelines.ToDictionary(
p => p.Name,
p => p,
StringComparer.OrdinalIgnoreCase);
}
/// <summary>
/// Gets a pipeline by name (case-insensitive).
/// </summary>
/// <param name="name">The pipeline name.</param>
/// <returns>The pipeline if found, or null.</returns>
public EtlPipelineConfig? GetByName(string name) =>
_byName.TryGetValue(name, out var p) ? p : null;
}
@@ -0,0 +1,168 @@
using JdeScoping.DataSync.Configuration;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Validates pipeline definitions according to the schema and business rules.
/// </summary>
public class PipelineValidator : IPipelineValidator
{
private static readonly HashSet<string> ValidConnections = new(StringComparer.OrdinalIgnoreCase)
{
"jde", "cms", "giw", "lotfinder"
};
/// <inheritdoc/>
public PipelineValidationResult Validate(EtlPipelineConfig pipeline, string fileName)
{
var result = new PipelineValidationResult { IsValid = true };
// Extract expected name from filename (pipeline.{Name}.json)
var expectedName = ExtractPipelineNameFromFileName(fileName);
// 1. Name must match filename (case-insensitive)
if (!string.Equals(pipeline.Name, expectedName, StringComparison.OrdinalIgnoreCase))
{
result.Errors.Add($"Pipeline name '{pipeline.Name}' does not match filename '{fileName}' (expected '{expectedName}')");
}
// 2. Source is required
if (pipeline.Source == null)
{
result.Errors.Add("Source is required");
}
else
{
ValidateSource(pipeline.Source, result);
}
// 3. Destination is required
if (pipeline.Destination == null)
{
result.Errors.Add("Destination is required");
}
else
{
ValidateDestination(pipeline.Destination, result);
}
// 4. Interval validation (only for enabled, non-manual-only pipelines)
if (pipeline.IsEnabled && !pipeline.IsManualOnly)
{
ValidateIntervals(pipeline, result);
}
// 5. Script validation
ValidateScripts(pipeline.PreScripts, "PreScripts", result);
ValidateScripts(pipeline.PostScripts, "PostScripts", result);
// 6. Warning: MassQuery missing when mass interval set and query has parameters
if (pipeline.MassSyncIntervalMinutes.HasValue &&
pipeline.Source?.Parameters?.Count > 0 &&
string.IsNullOrEmpty(pipeline.Source.MassQuery))
{
result.Warnings.Add("MassQuery is not specified but pipeline has parameters and mass sync is enabled. Mass sync will use the incremental query which may not work correctly.");
}
// 7. Warning: Hourly without daily
if (pipeline.HourlySyncIntervalMinutes.HasValue && !pipeline.DailySyncIntervalMinutes.HasValue)
{
result.Warnings.Add("HourlySyncIntervalMinutes is set without DailySyncIntervalMinutes. Consider adding daily sync.");
}
result.IsValid = result.Errors.Count == 0;
return result;
}
private static string ExtractPipelineNameFromFileName(string fileName)
{
// Expected format: pipeline.{Name}.json
if (fileName.StartsWith("pipeline.", StringComparison.OrdinalIgnoreCase) &&
fileName.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
return fileName.Substring(9, fileName.Length - 9 - 5); // Remove "pipeline." and ".json"
}
return fileName;
}
private static void ValidateSource(SourceElement source, PipelineValidationResult result)
{
// Connection must be valid
if (string.IsNullOrWhiteSpace(source.Connection))
{
result.Errors.Add("Source.Connection is required");
}
else if (!ValidConnections.Contains(source.Connection))
{
result.Errors.Add($"Source.Connection '{source.Connection}' is not valid. Expected one of: {string.Join(", ", ValidConnections)}");
}
// Query is required
if (string.IsNullOrWhiteSpace(source.Query))
{
result.Errors.Add("Source.Query is required");
}
}
private static void ValidateDestination(DestinationElement destination, PipelineValidationResult result)
{
// Table is required
if (string.IsNullOrWhiteSpace(destination.Table))
{
result.Errors.Add("Destination.Table is required");
}
// MatchColumns must have at least one entry
if (destination.MatchColumns == null || destination.MatchColumns.Count == 0)
{
result.Errors.Add("Destination.MatchColumns must have at least one column");
}
}
private static void ValidateIntervals(EtlPipelineConfig pipeline, PipelineValidationResult result)
{
// At least one interval must be set for enabled, non-manual-only pipelines
var hasAnyInterval = pipeline.MassSyncIntervalMinutes.HasValue ||
pipeline.DailySyncIntervalMinutes.HasValue ||
pipeline.HourlySyncIntervalMinutes.HasValue;
if (!hasAnyInterval)
{
result.Errors.Add("At least one sync interval (mass, daily, or hourly) must be set for enabled pipelines. Set IsManualOnly=true for manual-only pipelines.");
}
// All non-null intervals must be positive
if (pipeline.MassSyncIntervalMinutes.HasValue && pipeline.MassSyncIntervalMinutes.Value <= 0)
{
result.Errors.Add("MassSyncIntervalMinutes must be greater than 0");
}
if (pipeline.DailySyncIntervalMinutes.HasValue && pipeline.DailySyncIntervalMinutes.Value <= 0)
{
result.Errors.Add("DailySyncIntervalMinutes must be greater than 0");
}
if (pipeline.HourlySyncIntervalMinutes.HasValue && pipeline.HourlySyncIntervalMinutes.Value <= 0)
{
result.Errors.Add("HourlySyncIntervalMinutes must be greater than 0");
}
}
private static void ValidateScripts(List<ScriptElement>? scripts, string scriptType, PipelineValidationResult result)
{
if (scripts == null)
{
return;
}
for (var i = 0; i < scripts.Count; i++)
{
var script = scripts[i];
if (string.IsNullOrWhiteSpace(script.Script))
{
result.Errors.Add($"{scriptType}[{i}].Script is required");
}
}
}
}
@@ -1,6 +1,7 @@
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Infrastructure;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
@@ -11,10 +12,12 @@ namespace JdeScoping.DataSync.Services;
/// <summary>
/// Checks schedules and determines which sync tasks need to be executed.
/// Uses the pipeline registry for pipeline definitions.
/// </summary>
public class ScheduleChecker : IScheduleChecker
{
private readonly IDataUpdateRepository _repository;
private readonly IPipelineRegistry _pipelineRegistry;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<ScheduleChecker> _logger;
@@ -22,14 +25,17 @@ public class ScheduleChecker : IScheduleChecker
/// Initializes a new instance of the <see cref="ScheduleChecker"/> class.
/// </summary>
/// <param name="repository">Repository for data update records.</param>
/// <param name="pipelineRegistry">Registry of pipeline definitions.</param>
/// <param name="options">Data sync configuration options.</param>
/// <param name="logger">Logger instance.</param>
public ScheduleChecker(
IDataUpdateRepository repository,
IPipelineRegistry pipelineRegistry,
IOptions<DataSyncOptions> options,
ILogger<ScheduleChecker> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_pipelineRegistry = pipelineRegistry ?? throw new ArgumentNullException(nameof(pipelineRegistry));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -41,9 +47,16 @@ public class ScheduleChecker : IScheduleChecker
var tasks = new List<DataUpdateTask>();
var now = DateTime.UtcNow;
foreach (var config in _options.Value.DataSources.Where(c => c.IsEnabled))
// Use the new pipeline registry for scheduling
foreach (var pipeline in _pipelineRegistry.GetEnabledPipelines())
{
var task = CheckConfigSchedule(config, lastUpdates, now);
// Skip manual-only pipelines for scheduled execution
if (pipeline.IsManualOnly)
{
continue;
}
var task = CheckPipelineSchedule(pipeline, lastUpdates, now);
if (task != null)
{
tasks.Add(task);
@@ -66,63 +79,65 @@ public class ScheduleChecker : IScheduleChecker
}
/// <summary>
/// Checks a single data source config and returns a task if sync is needed.
/// Checks a single pipeline and returns a task if sync is needed.
/// Priority order: Mass > Daily > Hourly
/// </summary>
private DataUpdateTask? CheckConfigSchedule(
DataSourceConfig config,
private DataUpdateTask? CheckPipelineSchedule(
EtlPipelineConfig pipeline,
Dictionary<string, DataUpdate> lastUpdates,
DateTime now)
{
var tableName = pipeline.Destination.Table;
// Get last updates for each type
var massKey = GetUpdateKey(config.TableName, UpdateTypes.Mass);
var dailyKey = GetUpdateKey(config.TableName, UpdateTypes.Daily);
var hourlyKey = GetUpdateKey(config.TableName, UpdateTypes.Hourly);
var massKey = GetUpdateKey(tableName, UpdateTypes.Mass);
var dailyKey = GetUpdateKey(tableName, UpdateTypes.Daily);
var hourlyKey = GetUpdateKey(tableName, UpdateTypes.Hourly);
lastUpdates.TryGetValue(massKey, out var lastMass);
lastUpdates.TryGetValue(dailyKey, out var lastDaily);
lastUpdates.TryGetValue(hourlyKey, out var lastHourly);
// Check Mass first (highest priority)
if (config.MassConfig.Enabled && NeedsMassSync(config, lastMass, now))
if (pipeline.SupportsMassSync && NeedsMassSync(pipeline, lastMass, now))
{
_logger.LogDebug(
"Mass sync needed for {Table}: last={LastSync}, interval={Interval}m",
config.TableName,
tableName,
lastMass?.EndDt?.ToString("o") ?? "never",
config.MassConfig.IntervalMinutes);
pipeline.MassSyncIntervalMinutes);
return CreateTask(config, UpdateTypes.Mass, null);
return DataUpdateTask.FromPipeline(pipeline, UpdateTypes.Mass);
}
// Check Daily
if (config.DailyConfig.Enabled && NeedsDailySync(config, lastDaily, lastMass, now))
if (pipeline.SupportsDailySync && NeedsDailySync(pipeline, lastDaily, lastMass, now))
{
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
var minimumDt = CalculateMinimumDt(lastDaily, pipeline.DailySyncIntervalMinutes!.Value);
_logger.LogDebug(
"Daily sync needed for {Table}: last={LastSync}, interval={Interval}m, minDT={MinDT}",
config.TableName,
tableName,
lastDaily?.EndDt?.ToString("o") ?? "never",
config.DailyConfig.IntervalMinutes,
pipeline.DailySyncIntervalMinutes,
minimumDt?.ToString("o") ?? "null");
return CreateTask(config, UpdateTypes.Daily, minimumDt);
return DataUpdateTask.FromPipeline(pipeline, UpdateTypes.Daily, minimumDt);
}
// Check Hourly
if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now))
if (pipeline.SupportsHourlySync && NeedsHourlySync(pipeline, lastHourly, lastDaily, lastMass, now))
{
var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes);
var minimumDt = CalculateMinimumDt(lastHourly, pipeline.HourlySyncIntervalMinutes!.Value);
_logger.LogDebug(
"Hourly sync needed for {Table}: last={LastSync}, interval={Interval}m, minDT={MinDT}",
config.TableName,
tableName,
lastHourly?.EndDt?.ToString("o") ?? "never",
config.HourlyConfig.IntervalMinutes,
pipeline.HourlySyncIntervalMinutes,
minimumDt?.ToString("o") ?? "null");
return CreateTask(config, UpdateTypes.Hourly, minimumDt);
return DataUpdateTask.FromPipeline(pipeline, UpdateTypes.Hourly, minimumDt);
}
return null;
@@ -131,7 +146,7 @@ public class ScheduleChecker : IScheduleChecker
/// <summary>
/// Determines if a mass sync is needed.
/// </summary>
private bool NeedsMassSync(DataSourceConfig config, DataUpdate? lastMass, DateTime now)
private static bool NeedsMassSync(EtlPipelineConfig pipeline, DataUpdate? lastMass, DateTime now)
{
// Never synced before - need mass sync
if (lastMass == null)
@@ -147,14 +162,18 @@ public class ScheduleChecker : IScheduleChecker
}
// EndDt is set for successful syncs (GetLastDataUpdatesAsync filters WasSuccessful=1)
var nextSyncDue = lastMass.EndDt!.Value.AddMinutes(config.MassConfig.IntervalMinutes);
var nextSyncDue = lastMass.EndDt!.Value.AddMinutes(pipeline.MassSyncIntervalMinutes!.Value);
return now > nextSyncDue;
}
/// <summary>
/// Determines if a daily sync is needed.
/// </summary>
private bool NeedsDailySync(DataSourceConfig config, DataUpdate? lastDaily, DataUpdate? lastMass, DateTime now)
private static bool NeedsDailySync(
EtlPipelineConfig pipeline,
DataUpdate? lastDaily,
DataUpdate? lastMass,
DateTime now)
{
// If no mass sync ever happened, we need mass first
if (lastMass == null)
@@ -175,15 +194,15 @@ public class ScheduleChecker : IScheduleChecker
}
// EndDt is set for successful syncs (GetLastDataUpdatesAsync filters WasSuccessful=1)
var nextSyncDue = lastDaily.EndDt!.Value.AddMinutes(config.DailyConfig.IntervalMinutes);
var nextSyncDue = lastDaily.EndDt!.Value.AddMinutes(pipeline.DailySyncIntervalMinutes!.Value);
return now > nextSyncDue;
}
/// <summary>
/// Determines if an hourly sync is needed.
/// </summary>
private bool NeedsHourlySync(
DataSourceConfig config,
private static bool NeedsHourlySync(
EtlPipelineConfig pipeline,
DataUpdate? lastHourly,
DataUpdate? lastDaily,
DataUpdate? lastMass,
@@ -208,7 +227,7 @@ public class ScheduleChecker : IScheduleChecker
}
// EndDt is set for successful syncs (GetLastDataUpdatesAsync filters WasSuccessful=1)
var nextSyncDue = lastHourly.EndDt!.Value.AddMinutes(config.HourlyConfig.IntervalMinutes);
var nextSyncDue = lastHourly.EndDt!.Value.AddMinutes(pipeline.HourlySyncIntervalMinutes!.Value);
return now > nextSyncDue;
}
@@ -227,22 +246,6 @@ public class ScheduleChecker : IScheduleChecker
return lastUpdate.EndDt!.Value.AddMinutes(-lookbackMinutes);
}
/// <summary>
/// Creates a data update task.
/// </summary>
private static DataUpdateTask CreateTask(DataSourceConfig config, UpdateTypes updateType, DateTime? minimumDt)
{
return new DataUpdateTask
{
TableName = config.TableName,
SourceSystem = config.SourceSystem,
SourceData = config.SourceData,
UpdateType = updateType,
MinimumDt = minimumDt,
Config = config
};
}
/// <summary>
/// Gets the dictionary key for looking up last updates.
/// </summary>
@@ -1,5 +1,6 @@
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -106,4 +107,53 @@ public class SyncOrchestrator : ISyncOrchestrator
_metrics.RecordCycleCompleted(completedCount, failedCount, elapsed.TotalSeconds);
}
/// <inheritdoc/>
public async Task ExecuteSingleSyncAsync(DataUpdateTask task, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Executing single sync for {Table} ({Type})",
task.TableName,
task.UpdateType);
var startTime = DateTime.UtcNow;
await using var scope = _scopeFactory.CreateAsyncScope();
try
{
var operation = scope.ServiceProvider.GetRequiredService<ITableSyncOperation>();
await operation.ExecuteAsync(task, cancellationToken);
var elapsed = DateTime.UtcNow - startTime;
_logger.LogInformation(
"Single sync for {Table} ({Type}) completed in {Elapsed:F1}s",
task.TableName,
task.UpdateType,
elapsed.TotalSeconds);
_metrics.RecordCycleCompleted(1, 0, elapsed.TotalSeconds);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
_logger.LogWarning(
"Single sync for {Table} ({Type}) was cancelled",
task.TableName,
task.UpdateType);
throw;
}
catch (Exception ex)
{
var elapsed = DateTime.UtcNow - startTime;
_logger.LogError(
ex,
"Single sync for {Table} ({Type}) failed after {Elapsed:F1}s",
task.TableName,
task.UpdateType,
elapsed.TotalSeconds);
_metrics.RecordCycleCompleted(0, 1, elapsed.TotalSeconds);
throw;
}
}
}
+175 -6
View File
@@ -1,7 +1,12 @@
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Services;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using JdeScoping.Domain.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -11,7 +16,7 @@ namespace JdeScoping.DataSync;
/// <summary>
/// Unified background service that coordinates data synchronization and search processing.
/// Data freshness takes priority over search processing.
/// Priority order: Manual sync requests > Scheduled syncs > Search processing.
/// </summary>
public class WorkProcessor : BackgroundService
{
@@ -52,8 +57,9 @@ public class WorkProcessor : BackgroundService
}
_logger.LogInformation(
"WorkProcessor starting with WorkInterval={WorkInterval}",
_options.WorkInterval);
"WorkProcessor starting with WorkInterval={WorkInterval}, MaxManualRequestsPerCycle={MaxManual}",
_options.WorkInterval,
_options.MaxManualRequestsPerCycle);
// Startup cleanup
await StartupCleanupAsync(stoppingToken);
@@ -135,13 +141,21 @@ public class WorkProcessor : BackgroundService
}
/// <summary>
/// Performs one work cycle: data syncs have priority, then search processing.
/// Performs one work cycle: manual syncs (priority 1), scheduled syncs (priority 2), search processing (priority 3).
/// </summary>
private async Task<string> DoWorkAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Priority 1: Data syncs
// Priority 1: Manual sync requests (with fairness cap)
var processedManualCount = await ProcessManualSyncRequestsAsync(scope, ct);
if (processedManualCount > 0)
{
// After processing manual requests, still check scheduled syncs
// This implements the fairness policy - manual requests don't completely block scheduled syncs
}
// Priority 2: Data syncs
var scheduleChecker = scope.ServiceProvider.GetRequiredService<IScheduleChecker>();
var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct);
@@ -158,7 +172,7 @@ public class WorkProcessor : BackgroundService
return "Idle";
}
// Priority 2: Search processing (only when syncs are current)
// Priority 3: Search processing (only when syncs are current)
var searchRepository = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var search = await searchRepository.GetNextQueuedSearchAsync(ct);
@@ -176,6 +190,161 @@ public class WorkProcessor : BackgroundService
return "Idle";
}
/// <summary>
/// Processes pending manual sync requests up to the configured limit.
/// </summary>
/// <returns>The number of manual requests processed.</returns>
private async Task<int> ProcessManualSyncRequestsAsync(AsyncServiceScope scope, CancellationToken ct)
{
var manualSyncService = scope.ServiceProvider.GetService<IManualSyncRequestService>();
if (manualSyncService == null)
{
// Service not registered - skip manual sync processing
return 0;
}
var pipelineRegistry = scope.ServiceProvider.GetRequiredService<IPipelineRegistry>();
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
var processedCount = 0;
var maxRequests = _options.MaxManualRequestsPerCycle > 0
? _options.MaxManualRequestsPerCycle
: int.MaxValue;
while (processedCount < maxRequests)
{
ct.ThrowIfCancellationRequested();
var request = await manualSyncService.GetNextPendingRequestAsync(ct);
if (request == null)
{
break;
}
await ProcessManualSyncRequestAsync(
request,
pipelineRegistry,
orchestrator,
manualSyncService,
ct);
processedCount++;
}
if (processedCount > 0)
{
_logger.LogInformation("Processed {Count} manual sync requests", processedCount);
}
return processedCount;
}
/// <summary>
/// Processes a single manual sync request.
/// </summary>
private async Task ProcessManualSyncRequestAsync(
ManualSyncRequest request,
IPipelineRegistry pipelineRegistry,
ISyncOrchestrator orchestrator,
IManualSyncRequestService manualSyncService,
CancellationToken ct)
{
_logger.LogInformation(
"Processing manual sync request #{Id}: Pipeline={Pipeline}, SyncType={SyncType}",
request.Id, request.PipelineName, request.SyncType);
await NotifyStatusSafeAsync($"Processing manual sync: {request.PipelineName}", ct);
try
{
// Get pipeline from registry
var pipeline = pipelineRegistry.GetPipeline(request.PipelineName);
if (pipeline == null)
{
_logger.LogError(
"Manual sync request #{Id} failed: Pipeline '{Name}' not found",
request.Id, request.PipelineName);
// Can't mark as failed without adding a method - just complete it
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
return;
}
if (!pipeline.IsEnabled)
{
_logger.LogError(
"Manual sync request #{Id} failed: Pipeline '{Name}' is disabled",
request.Id, request.PipelineName);
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
return;
}
// Parse sync type
if (!TryParseUpdateType(request.SyncType, out var updateType))
{
_logger.LogError(
"Manual sync request #{Id} failed: Invalid sync type '{Type}'",
request.Id, request.SyncType);
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
return;
}
// Validate that the pipeline supports this sync type
if (!pipelineRegistry.IsValidPipelineAndSyncType(request.PipelineName, request.SyncType))
{
_logger.LogError(
"Manual sync request #{Id} failed: Pipeline '{Name}' does not support {Type} sync",
request.Id, request.PipelineName, request.SyncType);
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
return;
}
// Create and execute the sync task
var task = DataUpdateTask.FromPipeline(pipeline, updateType);
await orchestrator.ExecuteSingleSyncAsync(task, ct);
// Mark request as completed
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
_logger.LogInformation(
"Manual sync request #{Id} completed successfully",
request.Id);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Manual sync request #{Id} failed with error",
request.Id);
// Try to mark as completed (best effort)
try
{
await manualSyncService.CompleteRequestAsync(request.Id, request.RowVersion, ct);
}
catch (Exception completeEx)
{
_logger.LogWarning(completeEx,
"Failed to mark manual sync request #{Id} as completed after error",
request.Id);
}
}
}
/// <summary>
/// Tries to parse a sync type string to an UpdateTypes enum value.
/// </summary>
private static bool TryParseUpdateType(string syncType, out UpdateTypes updateType)
{
updateType = syncType.ToLowerInvariant() switch
{
"mass" => UpdateTypes.Mass,
"daily" => UpdateTypes.Daily,
"hourly" => UpdateTypes.Hourly,
_ => (UpdateTypes)(-1)
};
return (int)updateType >= 0;
}
/// <summary>
/// Purges old DataUpdate entries periodically (every 24 hours).
/// </summary>