using JdeScoping.Api.Mapping;
using JdeScoping.Core.ApiContracts;
using JdeScoping.Core.Models.Pipelines;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace JdeScoping.Api.Controllers;
///
/// API endpoints for pipeline configuration and status.
///
[Route(ApiRoutes.Pipelines.Base)]
[Authorize]
public class PipelineController : ApiControllerBase
{
private readonly IEtlPipelineFactory _pipelineFactory;
private readonly IDataUpdateRepository _dataUpdateRepository;
private readonly IPipelineMapper _mapper;
///
/// Initializes a new instance of the class.
///
/// The ETL pipeline factory.
/// The data update repository.
/// The pipeline mapper.
public PipelineController(
IEtlPipelineFactory pipelineFactory,
IDataUpdateRepository dataUpdateRepository,
IPipelineMapper mapper)
{
_pipelineFactory = pipelineFactory;
_dataUpdateRepository = dataUpdateRepository;
_mapper = mapper;
}
///
/// Gets list of all available pipeline names.
///
[HttpGet]
public ActionResult GetPipelineNames()
{
var names = _pipelineFactory.GetAvailableTables()
.OrderBy(n => n)
.ToList();
return Ok(new PipelineListResponse(names));
}
///
/// Gets configuration for a specific pipeline.
///
/// The pipeline name.
[HttpGet(ApiRoutes.Pipelines.ByName)]
public ActionResult GetPipeline(string name)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var defaults = _pipelineFactory.GetScheduleDefaults();
var dto = _mapper.MapToDto(name, config, defaults);
return Ok(dto);
}
///
/// Gets schedule status for a pipeline.
///
/// The pipeline name.
/// The cancellation token.
[HttpGet(ApiRoutes.Pipelines.Status)]
public async Task> GetStatus(
string name,
CancellationToken cancellationToken)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var tableName = config.Destination.Table;
var lastRuns = await _dataUpdateRepository.GetLastRunsAsync(tableName, cancellationToken);
var lastSuccessful = await _dataUpdateRepository.GetLastDataUpdatesAsync(cancellationToken);
var defaults = _pipelineFactory.GetScheduleDefaults();
var statuses = new List();
foreach (var updateType in new[] { UpdateTypes.Mass, UpdateTypes.Daily, UpdateTypes.Hourly })
{
var scheduleConfig = _mapper.GetScheduleConfig(config, updateType);
var interval = _mapper.GetEffectiveInterval(scheduleConfig, defaults, updateType);
lastRuns.TryGetValue(updateType, out var lastRun);
var successKey = $"{tableName}_{(int)updateType}";
lastSuccessful.TryGetValue(successKey, out var lastSuccess);
var nextRequired = lastSuccess?.EndDt?.AddMinutes(interval);
var isOverdue = DataUpdateRepository.IsOverdue(
lastSuccess?.EndDt, tableName, updateType, null);
statuses.Add(new PipelineScheduleStatusDto(
updateType,
lastRun?.StartDt,
lastRun?.WasSuccessful ?? false,
lastSuccess?.EndDt,
nextRequired,
isOverdue,
interval));
}
return Ok(new PipelineStatusResponse(statuses));
}
///
/// Gets recent execution history for a pipeline.
///
/// The pipeline name.
/// The maximum number of recent executions to retrieve.
/// The cancellation token.
[HttpGet(ApiRoutes.Pipelines.Executions)]
public async Task> GetExecutions(
string name,
[FromQuery] int count = 30,
CancellationToken cancellationToken = default)
{
var config = _pipelineFactory.GetPipelineConfig(name);
if (config is null)
return NotFound();
var tableName = config.Destination.Table;
var updates = await _dataUpdateRepository.GetRecentUpdatesAsync(
tableName, null, count, cancellationToken);
var executions = updates.Select(u => new PipelineExecutionDto(
u.UpdateType,
u.StartDt,
u.EndDt == default ? null : u.EndDt,
u.EndDt == default ? null : u.EndDt - u.StartDt,
u.NumberRecords,
u.WasSuccessful
)).ToList();
return Ok(new PipelineExecutionsResponse(executions));
}
}