61253e3269
Resolves StoreAndForward-001, ExternalSystemGateway-001, NotificationService-001 — one systemic gap where buffered messages were persisted but never delivered, and the active node never replicated its buffer to the standby. Delivery handlers (ExternalSystemGateway-001 / NotificationService-001): - AkkaHostedService registers delivery handlers for the ExternalSystem, CachedDbWrite and Notification categories after StoreAndForwardService starts; each resolves its scoped consumer in a fresh DI scope. - ExternalSystemClient, DatabaseGateway and NotificationDeliveryService each gain a DeliverBufferedAsync method: re-resolve the target and re-attempt delivery, returning true/false/throwing per the transient-vs-permanent contract. - EnqueueAsync gains an attemptImmediateDelivery flag; CachedCallAsync and NotificationDeliveryService.SendAsync pass false (they already attempted delivery themselves) so registering a handler does not dispatch twice. Replication (StoreAndForward-001): - ReplicationService is injected into StoreAndForwardService; a new BufferAsync helper replicates every enqueue, and successful-retry removes and parks are replicated too. Fire-and-forget, no-op when replication is disabled. Tests: StoreAndForwardReplicationTests (Add/Remove/Park observed), attemptImmediateDelivery behaviour, and DeliverBufferedAsync paths for each consumer. Full solution builds; StoreAndForward/ExternalSystemGateway/ NotificationService suites green.
296 lines
12 KiB
C#
296 lines
12 KiB
C#
using System.Net;
|
|
using System.Net.Http.Headers;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using ScadaLink.Commons.Entities.ExternalSystems;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.StoreAndForward;
|
|
|
|
namespace ScadaLink.ExternalSystemGateway;
|
|
|
|
/// <summary>
|
|
/// WP-6: HTTP/REST client that invokes external APIs.
|
|
/// WP-7: Dual call modes — Call (synchronous) and CachedCall (S&F on transient failure).
|
|
/// WP-8: Error classification applied to HTTP responses and exceptions.
|
|
/// </summary>
|
|
public class ExternalSystemClient : IExternalSystemClient
|
|
{
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
|
private readonly IExternalSystemRepository _repository;
|
|
private readonly StoreAndForwardService? _storeAndForward;
|
|
private readonly ILogger<ExternalSystemClient> _logger;
|
|
|
|
public ExternalSystemClient(
|
|
IHttpClientFactory httpClientFactory,
|
|
IExternalSystemRepository repository,
|
|
ILogger<ExternalSystemClient> logger,
|
|
StoreAndForwardService? storeAndForward = null)
|
|
{
|
|
_httpClientFactory = httpClientFactory;
|
|
_repository = repository;
|
|
_logger = logger;
|
|
_storeAndForward = storeAndForward;
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-7: Synchronous call — all failures returned to caller.
|
|
/// </summary>
|
|
public async Task<ExternalCallResult> CallAsync(
|
|
string systemName,
|
|
string methodName,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var (system, method) = await ResolveSystemAndMethodAsync(systemName, methodName, cancellationToken);
|
|
if (system == null || method == null)
|
|
{
|
|
return new ExternalCallResult(false, null, $"External system '{systemName}' or method '{methodName}' not found");
|
|
}
|
|
|
|
try
|
|
{
|
|
var response = await InvokeHttpAsync(system, method, parameters, cancellationToken);
|
|
return new ExternalCallResult(true, response, null);
|
|
}
|
|
catch (TransientExternalSystemException ex)
|
|
{
|
|
return new ExternalCallResult(false, null, $"Transient error: {ex.Message}");
|
|
}
|
|
catch (PermanentExternalSystemException ex)
|
|
{
|
|
return new ExternalCallResult(false, null, $"Permanent error: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-7: CachedCall — attempt immediate, transient failure goes to S&F, permanent returned to script.
|
|
/// </summary>
|
|
public async Task<ExternalCallResult> CachedCallAsync(
|
|
string systemName,
|
|
string methodName,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
string? originInstanceName = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var (system, method) = await ResolveSystemAndMethodAsync(systemName, methodName, cancellationToken);
|
|
if (system == null || method == null)
|
|
{
|
|
return new ExternalCallResult(false, null, $"External system '{systemName}' or method '{methodName}' not found");
|
|
}
|
|
|
|
try
|
|
{
|
|
var response = await InvokeHttpAsync(system, method, parameters, cancellationToken);
|
|
return new ExternalCallResult(true, response, null);
|
|
}
|
|
catch (PermanentExternalSystemException ex)
|
|
{
|
|
// Permanent failures returned to script, never buffered
|
|
return new ExternalCallResult(false, null, $"Permanent error: {ex.Message}");
|
|
}
|
|
catch (TransientExternalSystemException)
|
|
{
|
|
// Transient failure — hand to S&F
|
|
if (_storeAndForward == null)
|
|
{
|
|
return new ExternalCallResult(false, null, "Transient error and store-and-forward not available");
|
|
}
|
|
|
|
var payload = JsonSerializer.Serialize(new
|
|
{
|
|
SystemName = systemName,
|
|
MethodName = methodName,
|
|
Parameters = parameters
|
|
});
|
|
|
|
// attemptImmediateDelivery: false — this method already made the HTTP
|
|
// attempt above; letting EnqueueAsync re-invoke the handler would
|
|
// dispatch the same request a second time.
|
|
await _storeAndForward.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem,
|
|
systemName,
|
|
payload,
|
|
originInstanceName,
|
|
system.MaxRetries > 0 ? system.MaxRetries : null,
|
|
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null,
|
|
attemptImmediateDelivery: false);
|
|
|
|
return new ExternalCallResult(true, null, null, WasBuffered: true);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-7/10: Delivers a buffered ExternalSystem call during a store-and-forward
|
|
/// retry sweep. Returns true on success, false on permanent failure (the message
|
|
/// is parked); throws <see cref="TransientExternalSystemException"/> on a
|
|
/// transient failure so the engine retries.
|
|
/// </summary>
|
|
public async Task<bool> DeliverBufferedAsync(
|
|
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
|
{
|
|
var payload = JsonSerializer.Deserialize<CachedCallPayload>(message.PayloadJson);
|
|
if (payload == null || string.IsNullOrEmpty(payload.SystemName) || string.IsNullOrEmpty(payload.MethodName))
|
|
{
|
|
_logger.LogError("Buffered ExternalSystem message {Id} has an unreadable payload; parking.", message.Id);
|
|
return false;
|
|
}
|
|
|
|
var (system, method) = await ResolveSystemAndMethodAsync(
|
|
payload.SystemName, payload.MethodName, cancellationToken);
|
|
if (system == null || method == null)
|
|
{
|
|
_logger.LogError(
|
|
"Buffered call to '{System}'/'{Method}' cannot be delivered — the system or method no longer exists; parking.",
|
|
payload.SystemName, payload.MethodName);
|
|
return false;
|
|
}
|
|
|
|
var parameters = payload.Parameters?.ToDictionary(kv => kv.Key, kv => (object?)kv.Value);
|
|
try
|
|
{
|
|
await InvokeHttpAsync(system, method, parameters, cancellationToken);
|
|
return true;
|
|
}
|
|
catch (PermanentExternalSystemException ex)
|
|
{
|
|
_logger.LogError(ex, "Buffered call to '{System}' failed permanently; parking.", payload.SystemName);
|
|
return false;
|
|
}
|
|
// TransientExternalSystemException propagates — the S&F engine retries.
|
|
}
|
|
|
|
private sealed record CachedCallPayload(
|
|
string SystemName,
|
|
string MethodName,
|
|
Dictionary<string, JsonElement>? Parameters);
|
|
|
|
/// <summary>
|
|
/// WP-6: Executes the HTTP request against the external system.
|
|
/// </summary>
|
|
internal async Task<string?> InvokeHttpAsync(
|
|
ExternalSystemDefinition system,
|
|
ExternalSystemMethod method,
|
|
IReadOnlyDictionary<string, object?>? parameters,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var client = _httpClientFactory.CreateClient($"ExternalSystem_{system.Name}");
|
|
|
|
var url = BuildUrl(system.EndpointUrl, method.Path, parameters, method.HttpMethod);
|
|
var request = new HttpRequestMessage(new HttpMethod(method.HttpMethod), url);
|
|
|
|
// Apply authentication
|
|
ApplyAuth(request, system);
|
|
|
|
// For POST/PUT/PATCH, send parameters as JSON body
|
|
if (method.HttpMethod.Equals("POST", StringComparison.OrdinalIgnoreCase) ||
|
|
method.HttpMethod.Equals("PUT", StringComparison.OrdinalIgnoreCase) ||
|
|
method.HttpMethod.Equals("PATCH", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
if (parameters != null && parameters.Count > 0)
|
|
{
|
|
request.Content = new StringContent(
|
|
JsonSerializer.Serialize(parameters),
|
|
Encoding.UTF8,
|
|
"application/json");
|
|
}
|
|
}
|
|
|
|
HttpResponseMessage response;
|
|
try
|
|
{
|
|
response = await client.SendAsync(request, cancellationToken);
|
|
}
|
|
catch (Exception ex) when (ErrorClassifier.IsTransient(ex))
|
|
{
|
|
throw ErrorClassifier.AsTransient($"Connection error to {system.Name}: {ex.Message}", ex);
|
|
}
|
|
|
|
if (response.IsSuccessStatusCode)
|
|
{
|
|
return await response.Content.ReadAsStringAsync(cancellationToken);
|
|
}
|
|
|
|
var errorBody = await response.Content.ReadAsStringAsync(cancellationToken);
|
|
|
|
if (ErrorClassifier.IsTransient(response.StatusCode))
|
|
{
|
|
throw ErrorClassifier.AsTransient(
|
|
$"HTTP {(int)response.StatusCode} from {system.Name}: {errorBody}");
|
|
}
|
|
|
|
throw new PermanentExternalSystemException(
|
|
$"HTTP {(int)response.StatusCode} from {system.Name}: {errorBody}",
|
|
(int)response.StatusCode);
|
|
}
|
|
|
|
private static string BuildUrl(string baseUrl, string path, IReadOnlyDictionary<string, object?>? parameters, string httpMethod)
|
|
{
|
|
var url = baseUrl.TrimEnd('/') + "/" + path.TrimStart('/');
|
|
|
|
// For GET/DELETE, append parameters as query string
|
|
if ((httpMethod.Equals("GET", StringComparison.OrdinalIgnoreCase) ||
|
|
httpMethod.Equals("DELETE", StringComparison.OrdinalIgnoreCase)) &&
|
|
parameters != null && parameters.Count > 0)
|
|
{
|
|
var queryString = string.Join("&",
|
|
parameters.Where(p => p.Value != null)
|
|
.Select(p => $"{Uri.EscapeDataString(p.Key)}={Uri.EscapeDataString(p.Value?.ToString() ?? "")}"));
|
|
url += "?" + queryString;
|
|
}
|
|
|
|
return url;
|
|
}
|
|
|
|
private static void ApplyAuth(HttpRequestMessage request, ExternalSystemDefinition system)
|
|
{
|
|
if (string.IsNullOrEmpty(system.AuthConfiguration))
|
|
return;
|
|
|
|
switch (system.AuthType.ToLowerInvariant())
|
|
{
|
|
case "apikey":
|
|
// Auth config format: "HeaderName:KeyValue" or just "KeyValue" (default header: X-API-Key)
|
|
var parts = system.AuthConfiguration.Split(':', 2);
|
|
if (parts.Length == 2)
|
|
{
|
|
request.Headers.TryAddWithoutValidation(parts[0], parts[1]);
|
|
}
|
|
else
|
|
{
|
|
request.Headers.TryAddWithoutValidation("X-API-Key", system.AuthConfiguration);
|
|
}
|
|
break;
|
|
|
|
case "basic":
|
|
// Auth config format: "username:password"
|
|
var basicParts = system.AuthConfiguration.Split(':', 2);
|
|
if (basicParts.Length == 2)
|
|
{
|
|
var encoded = Convert.ToBase64String(
|
|
Encoding.UTF8.GetBytes($"{basicParts[0]}:{basicParts[1]}"));
|
|
request.Headers.Authorization = new AuthenticationHeaderValue("Basic", encoded);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
private async Task<(ExternalSystemDefinition? system, ExternalSystemMethod? method)> ResolveSystemAndMethodAsync(
|
|
string systemName,
|
|
string methodName,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var systems = await _repository.GetAllExternalSystemsAsync(cancellationToken);
|
|
var system = systems.FirstOrDefault(s => s.Name.Equals(systemName, StringComparison.OrdinalIgnoreCase));
|
|
if (system == null)
|
|
return (null, null);
|
|
|
|
var methods = await _repository.GetMethodsByExternalSystemIdAsync(system.Id, cancellationToken);
|
|
var method = methods.FirstOrDefault(m => m.Name.Equals(methodName, StringComparison.OrdinalIgnoreCase));
|
|
|
|
return (system, method);
|
|
}
|
|
}
|