feat(api): add PipelineController and factory methods for pipeline viewer

- Add GetAvailableTables, GetPipelineConfig, GetScheduleDefaults to IEtlPipelineFactory
- Implement new methods in EtlPipelineFactory
- Create PipelineController with endpoints:
  - GET /api/pipelines - list all pipeline names
  - GET /api/pipelines/{name} - get pipeline configuration
  - GET /api/pipelines/{name}/status - get schedule status
  - GET /api/pipelines/{name}/executions - get execution history
- Add JdeScoping.DataSync reference to JdeScoping.Api
This commit is contained in:
Joseph Doherty
2026-01-07 08:06:45 -05:00
parent 33a04f4022
commit 676f090fc8
4 changed files with 256 additions and 0 deletions
@@ -0,0 +1,212 @@
using JdeScoping.Core.ApiContracts;
using JdeScoping.Core.ApiContracts.Pipelines;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace JdeScoping.Api.Controllers;
/// <summary>
/// API endpoints for pipeline configuration and status.
/// </summary>
[ApiController]
[Route(ApiRoutes.Pipelines.Base)]
[Authorize]
public class PipelineController : ControllerBase
{
private readonly IEtlPipelineFactory _pipelineFactory;
private readonly IDataUpdateRepository _dataUpdateRepository;
public PipelineController(
IEtlPipelineFactory pipelineFactory,
IDataUpdateRepository dataUpdateRepository)
{
_pipelineFactory = pipelineFactory;
_dataUpdateRepository = dataUpdateRepository;
}
/// <summary>
/// Gets list of all available pipeline names.
/// </summary>
[HttpGet]
public ActionResult<PipelineListResponse> GetPipelineNames()
{
var names = _pipelineFactory.GetAvailableTables()
.OrderBy(n => n)
.ToList();
return Ok(new PipelineListResponse(names));
}
/// <summary>
/// Gets configuration for a specific pipeline.
/// </summary>
[HttpGet(ApiRoutes.Pipelines.ByName)]
public ActionResult<PipelineConfigDto> GetPipeline(string name)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var defaults = _pipelineFactory.GetScheduleDefaults();
var dto = MapToDto(name, config, defaults);
return Ok(dto);
}
/// <summary>
/// Gets schedule status for a pipeline.
/// </summary>
[HttpGet(ApiRoutes.Pipelines.Status)]
public async Task<ActionResult<PipelineStatusResponse>> GetStatus(
string name,
CancellationToken cancellationToken)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var tableName = config.Destination.Table;
var lastRuns = await _dataUpdateRepository.GetLastRunsAsync(tableName, cancellationToken);
var lastSuccessful = await _dataUpdateRepository.GetLastDataUpdatesAsync(cancellationToken);
var defaults = _pipelineFactory.GetScheduleDefaults();
var statuses = new List<PipelineScheduleStatusDto>();
foreach (var updateType in new[] { UpdateTypes.Mass, UpdateTypes.Daily, UpdateTypes.Hourly })
{
var scheduleConfig = GetScheduleConfig(config, updateType);
var interval = GetEffectiveInterval(scheduleConfig, defaults, updateType);
lastRuns.TryGetValue(updateType, out var lastRun);
var successKey = $"{tableName}_{(int)updateType}";
lastSuccessful.TryGetValue(successKey, out var lastSuccess);
var nextRequired = lastSuccess?.EndDt.AddMinutes(interval);
var isOverdue = DataUpdateRepository.IsOverdue(
lastSuccess?.EndDt, tableName, updateType, null);
statuses.Add(new PipelineScheduleStatusDto(
updateType,
lastRun?.StartDt,
lastRun?.WasSuccessful ?? false,
lastSuccess?.EndDt,
nextRequired,
isOverdue,
interval));
}
return Ok(new PipelineStatusResponse(statuses));
}
/// <summary>
/// Gets recent execution history for a pipeline.
/// </summary>
[HttpGet(ApiRoutes.Pipelines.Executions)]
public async Task<ActionResult<PipelineExecutionsResponse>> GetExecutions(
string name,
[FromQuery] int count = 30,
CancellationToken cancellationToken = default)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var tableName = config.Destination.Table;
var updates = await _dataUpdateRepository.GetRecentUpdatesAsync(
tableName, null, count, cancellationToken);
var executions = updates.Select(u => new PipelineExecutionDto(
u.UpdateType,
u.StartDt,
u.EndDt == default ? null : u.EndDt,
u.EndDt == default ? null : u.EndDt - u.StartDt,
u.NumberRecords,
u.WasSuccessful
)).ToList();
return Ok(new PipelineExecutionsResponse(executions));
}
private static PipelineConfigDto MapToDto(
string name,
PipelineConfig config,
ScheduleDefaults defaults)
{
var source = new PipelineSourceDto(
config.Source.Connection,
Truncate(config.Source.Query),
Truncate(config.Source.MassQuery),
config.Source.Query,
config.Source.MassQuery,
config.Source.Parameters?.Select(p => new PipelineParameterDto(
p.Key, p.Value.Format, p.Value.Source)).ToList() ?? []);
var matchCols = config.Destination.MatchColumns?.ToList();
var destination = new PipelineDestinationDto(
config.Destination.Table,
matchCols?.Count > 0 ? "BulkMerge" : "BulkImport",
matchCols,
config.Destination.ExcludeFromUpdate?.ToList());
var schedules = new PipelineSchedulesDto(
MapSchedule(config.Schedules?.Mass, defaults.Mass),
MapSchedule(config.Schedules?.Daily, defaults.Daily),
MapSchedule(config.Schedules?.Hourly, defaults.Hourly));
return new PipelineConfigDto(
name,
source,
destination,
schedules,
config.PreScripts?.Count ?? 0,
config.PostScripts?.Count ?? 0,
config.PreScripts,
config.PostScripts);
}
private static PipelineScheduleDto MapSchedule(
ScheduleConfig? config,
ScheduleConfig defaults)
{
return new PipelineScheduleDto(
config?.Enabled ?? defaults.Enabled,
config?.IntervalMinutes > 0 ? config.IntervalMinutes : defaults.IntervalMinutes,
config?.PrePurge ?? defaults.PrePurge,
config?.ReIndex ?? defaults.ReIndex,
config?.IntervalMinutes > 0 && config.IntervalMinutes != defaults.IntervalMinutes,
config?.PrePurge != null && config.PrePurge != defaults.PrePurge,
config?.ReIndex != null && config.ReIndex != defaults.ReIndex);
}
private static ScheduleConfig? GetScheduleConfig(
PipelineConfig config,
UpdateTypes updateType) => updateType switch
{
UpdateTypes.Mass => config.Schedules?.Mass,
UpdateTypes.Daily => config.Schedules?.Daily,
UpdateTypes.Hourly => config.Schedules?.Hourly,
_ => null
};
private static int GetEffectiveInterval(
ScheduleConfig? config,
ScheduleDefaults defaults,
UpdateTypes updateType)
{
if (config?.IntervalMinutes > 0)
return config.IntervalMinutes;
return updateType switch
{
UpdateTypes.Mass => defaults.Mass.IntervalMinutes,
UpdateTypes.Daily => defaults.Daily.IntervalMinutes,
UpdateTypes.Hourly => defaults.Hourly.IntervalMinutes,
_ => 60
};
}
private static string? Truncate(string? value, int maxLength = 100) =>
value is null ? null :
value.Length <= maxLength ? value : value[..maxLength] + "...";
}
@@ -12,6 +12,7 @@
<ItemGroup>
<ProjectReference Include="..\JdeScoping.Core\JdeScoping.Core.csproj" />
<ProjectReference Include="..\JdeScoping.DataSync\JdeScoping.DataSync.csproj" />
<ProjectReference Include="..\JdeScoping.ExcelIO\JdeScoping.ExcelIO.csproj" />
<ProjectReference Include="..\JdeScoping.Infrastructure\JdeScoping.Infrastructure.csproj" />
</ItemGroup>
@@ -1,11 +1,36 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Etl.Pipeline;
namespace JdeScoping.DataSync.Contracts;
public interface IEtlPipelineFactory
{
/// <summary>
/// Creates a pipeline builder for the specified table.
/// </summary>
/// <param name="tableName">The table name (pipeline key).</param>
/// <returns>A builder for configuring the pipeline.</returns>
IEtlPipelineBuilder ForTable(string tableName);
/// <summary>
/// Gets the list of available table names (pipeline keys).
/// </summary>
/// <returns>List of table names with configured pipelines.</returns>
IReadOnlyList<string> GetAvailableTables();
/// <summary>
/// Gets the configuration for a specific pipeline.
/// </summary>
/// <param name="tableName">The table name (pipeline key).</param>
/// <returns>The pipeline configuration, or null if not found.</returns>
PipelineConfig? GetPipelineConfig(string tableName);
/// <summary>
/// Gets the schedule defaults from the configuration.
/// </summary>
/// <returns>The schedule defaults.</returns>
ScheduleDefaults GetScheduleDefaults();
}
public interface IEtlPipelineBuilder
@@ -90,6 +90,24 @@ public class EtlPipelineFactory : IEtlPipelineFactory
_logger);
}
/// <inheritdoc />
public IReadOnlyList<string> GetAvailableTables()
{
return _config.Pipelines.Keys.ToList().AsReadOnly();
}
/// <inheritdoc />
public PipelineConfig? GetPipelineConfig(string tableName)
{
return _config.Pipelines.TryGetValue(tableName, out var config) ? config : null;
}
/// <inheritdoc />
public ScheduleDefaults GetScheduleDefaults()
{
return _config.EffectiveScheduleDefaults;
}
private PipelinesRoot LoadPipelineConfigs(string configPath)
{
// Resolve path relative to assembly location (handles both debug and publish)