diff --git a/NEW/src/JdeScoping.Api/Controllers/PipelineController.cs b/NEW/src/JdeScoping.Api/Controllers/PipelineController.cs new file mode 100644 index 0000000..e7ae136 --- /dev/null +++ b/NEW/src/JdeScoping.Api/Controllers/PipelineController.cs @@ -0,0 +1,212 @@ +using JdeScoping.Core.ApiContracts; +using JdeScoping.Core.ApiContracts.Pipelines; +using JdeScoping.Core.Models.Enums; +using JdeScoping.DataSync.Configuration; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace JdeScoping.Api.Controllers; + +/// +/// API endpoints for pipeline configuration and status. +/// +[ApiController] +[Route(ApiRoutes.Pipelines.Base)] +[Authorize] +public class PipelineController : ControllerBase +{ + private readonly IEtlPipelineFactory _pipelineFactory; + private readonly IDataUpdateRepository _dataUpdateRepository; + + public PipelineController( + IEtlPipelineFactory pipelineFactory, + IDataUpdateRepository dataUpdateRepository) + { + _pipelineFactory = pipelineFactory; + _dataUpdateRepository = dataUpdateRepository; + } + + /// + /// Gets list of all available pipeline names. + /// + [HttpGet] + public ActionResult GetPipelineNames() + { + var names = _pipelineFactory.GetAvailableTables() + .OrderBy(n => n) + .ToList(); + return Ok(new PipelineListResponse(names)); + } + + /// + /// Gets configuration for a specific pipeline. + /// + [HttpGet(ApiRoutes.Pipelines.ByName)] + public ActionResult GetPipeline(string name) + { + var config = _pipelineFactory.GetPipelineConfig(name); + if (config is null) + return NotFound(); + + var defaults = _pipelineFactory.GetScheduleDefaults(); + var dto = MapToDto(name, config, defaults); + return Ok(dto); + } + + /// + /// Gets schedule status for a pipeline. + /// + [HttpGet(ApiRoutes.Pipelines.Status)] + public async Task> GetStatus( + string name, + CancellationToken cancellationToken) + { + var config = _pipelineFactory.GetPipelineConfig(name); + if (config is null) + return NotFound(); + + var tableName = config.Destination.Table; + var lastRuns = await _dataUpdateRepository.GetLastRunsAsync(tableName, cancellationToken); + var lastSuccessful = await _dataUpdateRepository.GetLastDataUpdatesAsync(cancellationToken); + var defaults = _pipelineFactory.GetScheduleDefaults(); + + var statuses = new List(); + foreach (var updateType in new[] { UpdateTypes.Mass, UpdateTypes.Daily, UpdateTypes.Hourly }) + { + var scheduleConfig = GetScheduleConfig(config, updateType); + var interval = GetEffectiveInterval(scheduleConfig, defaults, updateType); + + lastRuns.TryGetValue(updateType, out var lastRun); + var successKey = $"{tableName}_{(int)updateType}"; + lastSuccessful.TryGetValue(successKey, out var lastSuccess); + + var nextRequired = lastSuccess?.EndDt.AddMinutes(interval); + var isOverdue = DataUpdateRepository.IsOverdue( + lastSuccess?.EndDt, tableName, updateType, null); + + statuses.Add(new PipelineScheduleStatusDto( + updateType, + lastRun?.StartDt, + lastRun?.WasSuccessful ?? false, + lastSuccess?.EndDt, + nextRequired, + isOverdue, + interval)); + } + + return Ok(new PipelineStatusResponse(statuses)); + } + + /// + /// Gets recent execution history for a pipeline. + /// + [HttpGet(ApiRoutes.Pipelines.Executions)] + public async Task> GetExecutions( + string name, + [FromQuery] int count = 30, + CancellationToken cancellationToken = default) + { + var config = _pipelineFactory.GetPipelineConfig(name); + if (config is null) + return NotFound(); + + var tableName = config.Destination.Table; + var updates = await _dataUpdateRepository.GetRecentUpdatesAsync( + tableName, null, count, cancellationToken); + + var executions = updates.Select(u => new PipelineExecutionDto( + u.UpdateType, + u.StartDt, + u.EndDt == default ? null : u.EndDt, + u.EndDt == default ? null : u.EndDt - u.StartDt, + u.NumberRecords, + u.WasSuccessful + )).ToList(); + + return Ok(new PipelineExecutionsResponse(executions)); + } + + private static PipelineConfigDto MapToDto( + string name, + PipelineConfig config, + ScheduleDefaults defaults) + { + var source = new PipelineSourceDto( + config.Source.Connection, + Truncate(config.Source.Query), + Truncate(config.Source.MassQuery), + config.Source.Query, + config.Source.MassQuery, + config.Source.Parameters?.Select(p => new PipelineParameterDto( + p.Key, p.Value.Format, p.Value.Source)).ToList() ?? []); + + var matchCols = config.Destination.MatchColumns?.ToList(); + var destination = new PipelineDestinationDto( + config.Destination.Table, + matchCols?.Count > 0 ? "BulkMerge" : "BulkImport", + matchCols, + config.Destination.ExcludeFromUpdate?.ToList()); + + var schedules = new PipelineSchedulesDto( + MapSchedule(config.Schedules?.Mass, defaults.Mass), + MapSchedule(config.Schedules?.Daily, defaults.Daily), + MapSchedule(config.Schedules?.Hourly, defaults.Hourly)); + + return new PipelineConfigDto( + name, + source, + destination, + schedules, + config.PreScripts?.Count ?? 0, + config.PostScripts?.Count ?? 0, + config.PreScripts, + config.PostScripts); + } + + private static PipelineScheduleDto MapSchedule( + ScheduleConfig? config, + ScheduleConfig defaults) + { + return new PipelineScheduleDto( + config?.Enabled ?? defaults.Enabled, + config?.IntervalMinutes > 0 ? config.IntervalMinutes : defaults.IntervalMinutes, + config?.PrePurge ?? defaults.PrePurge, + config?.ReIndex ?? defaults.ReIndex, + config?.IntervalMinutes > 0 && config.IntervalMinutes != defaults.IntervalMinutes, + config?.PrePurge != null && config.PrePurge != defaults.PrePurge, + config?.ReIndex != null && config.ReIndex != defaults.ReIndex); + } + + private static ScheduleConfig? GetScheduleConfig( + PipelineConfig config, + UpdateTypes updateType) => updateType switch + { + UpdateTypes.Mass => config.Schedules?.Mass, + UpdateTypes.Daily => config.Schedules?.Daily, + UpdateTypes.Hourly => config.Schedules?.Hourly, + _ => null + }; + + private static int GetEffectiveInterval( + ScheduleConfig? config, + ScheduleDefaults defaults, + UpdateTypes updateType) + { + if (config?.IntervalMinutes > 0) + return config.IntervalMinutes; + + return updateType switch + { + UpdateTypes.Mass => defaults.Mass.IntervalMinutes, + UpdateTypes.Daily => defaults.Daily.IntervalMinutes, + UpdateTypes.Hourly => defaults.Hourly.IntervalMinutes, + _ => 60 + }; + } + + private static string? Truncate(string? value, int maxLength = 100) => + value is null ? null : + value.Length <= maxLength ? value : value[..maxLength] + "..."; +} diff --git a/NEW/src/JdeScoping.Api/JdeScoping.Api.csproj b/NEW/src/JdeScoping.Api/JdeScoping.Api.csproj index d558569..5b9b0c9 100644 --- a/NEW/src/JdeScoping.Api/JdeScoping.Api.csproj +++ b/NEW/src/JdeScoping.Api/JdeScoping.Api.csproj @@ -12,6 +12,7 @@ + diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs index a0bf9c6..5a012c7 100644 --- a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs +++ b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs @@ -1,11 +1,36 @@ using JdeScoping.Core.Models.Enums; +using JdeScoping.DataSync.Configuration; using JdeScoping.DataSync.Etl.Pipeline; namespace JdeScoping.DataSync.Contracts; public interface IEtlPipelineFactory { + /// + /// Creates a pipeline builder for the specified table. + /// + /// The table name (pipeline key). + /// A builder for configuring the pipeline. IEtlPipelineBuilder ForTable(string tableName); + + /// + /// Gets the list of available table names (pipeline keys). + /// + /// List of table names with configured pipelines. + IReadOnlyList GetAvailableTables(); + + /// + /// Gets the configuration for a specific pipeline. + /// + /// The table name (pipeline key). + /// The pipeline configuration, or null if not found. + PipelineConfig? GetPipelineConfig(string tableName); + + /// + /// Gets the schedule defaults from the configuration. + /// + /// The schedule defaults. + ScheduleDefaults GetScheduleDefaults(); } public interface IEtlPipelineBuilder diff --git a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs index 261ed62..5efa4db 100644 --- a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs +++ b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs @@ -90,6 +90,24 @@ public class EtlPipelineFactory : IEtlPipelineFactory _logger); } + /// + public IReadOnlyList GetAvailableTables() + { + return _config.Pipelines.Keys.ToList().AsReadOnly(); + } + + /// + public PipelineConfig? GetPipelineConfig(string tableName) + { + return _config.Pipelines.TryGetValue(tableName, out var config) ? config : null; + } + + /// + public ScheduleDefaults GetScheduleDefaults() + { + return _config.EffectiveScheduleDefaults; + } + private PipelinesRoot LoadPipelineConfigs(string configPath) { // Resolve path relative to assembly location (handles both debug and publish)