The store-and-forward retry loop emits the per-attempt and terminal cached audit rows (ApiCallCached/DbWriteCached Attempted, CachedResolve) via CachedCallLifecycleBridge from a CachedCallAttemptContext, not from the script context. ExecutionId (and SourceScript) were not threaded through the S&F buffer, so those rows had ExecutionId = null and SourceScript = null. Thread both, additively, from the cached-call enqueue path: - StoreAndForwardMessage gains ExecutionId (Guid?) / SourceScript (string?). - StoreAndForwardStorage adds nullable execution_id / source_script columns via an idempotent PRAGMA-probed ALTER TABLE migration; rows persisted by an older build read back null (back-compat). - StoreAndForwardService.EnqueueAsync gains optional executionId / sourceScript params, stamped onto the buffered message and surfaced on the CachedCallAttemptContext built in the retry loop. - CachedCallAttemptContext gains ExecutionId / SourceScript. - CachedCallLifecycleBridge.BuildPacket sets AuditEvent.ExecutionId and AuditEvent.SourceScript from the context (replacing the hard-coded SourceScript = null and its now-stale comment). - IExternalSystemClient.CachedCallAsync / IDatabaseGateway.CachedWriteAsync gain optional executionId / sourceScript params; ScriptRuntimeContext's CachedCall / CachedWrite helpers pass _executionId / _sourceScript. Script-side cached rows (CachedSubmit, immediate Attempted+Resolve) are unchanged. All threading is additive — old buffered S&F rows still deserialize and process with the new fields null.
207 lines
8.7 KiB
C#
207 lines
8.7 KiB
C#
using System.Data.Common;
|
|
using System.Text.Json;
|
|
using Microsoft.Data.SqlClient;
|
|
using Microsoft.Extensions.Logging;
|
|
using ScadaLink.Commons.Entities.ExternalSystems;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.StoreAndForward;
|
|
|
|
namespace ScadaLink.ExternalSystemGateway;
|
|
|
|
/// <summary>
|
|
/// WP-9: Database access from scripts.
|
|
/// Database.Connection("name") — returns ADO.NET SqlConnection (connection pooling).
|
|
/// Database.CachedWrite("name", "sql", params) — submits to S&F engine.
|
|
/// </summary>
|
|
public class DatabaseGateway : IDatabaseGateway
|
|
{
|
|
private readonly IExternalSystemRepository _repository;
|
|
private readonly StoreAndForwardService? _storeAndForward;
|
|
private readonly ILogger<DatabaseGateway> _logger;
|
|
|
|
public DatabaseGateway(
|
|
IExternalSystemRepository repository,
|
|
ILogger<DatabaseGateway> logger,
|
|
StoreAndForwardService? storeAndForward = null)
|
|
{
|
|
_repository = repository;
|
|
_logger = logger;
|
|
_storeAndForward = storeAndForward;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns an open SqlConnection from the named database connection definition.
|
|
/// Connection pooling is managed by the underlying ADO.NET provider.
|
|
/// </summary>
|
|
public async Task<DbConnection> GetConnectionAsync(
|
|
string connectionName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var definition = await ResolveConnectionAsync(connectionName, cancellationToken);
|
|
if (definition == null)
|
|
{
|
|
throw new InvalidOperationException($"Database connection '{connectionName}' not found");
|
|
}
|
|
|
|
var connection = CreateConnection(definition.ConnectionString);
|
|
try
|
|
{
|
|
await connection.OpenAsync(cancellationToken);
|
|
}
|
|
catch
|
|
{
|
|
// OpenAsync failed (unreachable server, bad credentials, cancellation) —
|
|
// dispose the just-created connection before the exception propagates so
|
|
// it is not leaked (ExternalSystemGateway-010).
|
|
await connection.DisposeAsync();
|
|
throw;
|
|
}
|
|
return connection;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates the underlying ADO.NET connection for a connection string. Virtual so
|
|
/// tests can substitute a connection whose <c>OpenAsync</c> fails.
|
|
/// </summary>
|
|
internal virtual DbConnection CreateConnection(string connectionString) =>
|
|
new SqlConnection(connectionString);
|
|
|
|
/// <summary>
|
|
/// Submits a SQL write to the store-and-forward engine for reliable delivery.
|
|
/// </summary>
|
|
/// <param name="trackedOperationId">
|
|
/// Audit Log #23 (M3): used as the S&F message id so the retry loop can
|
|
/// recover it via <c>StoreAndForwardMessage.Id</c> and emit per-attempt /
|
|
/// terminal cached-write telemetry (Tasks E4/E5). Null preserves the
|
|
/// pre-M3 behaviour (S&F mints a random GUID).
|
|
/// </param>
|
|
public async Task CachedWriteAsync(
|
|
string connectionName,
|
|
string sql,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
string? originInstanceName = null,
|
|
CancellationToken cancellationToken = default,
|
|
TrackedOperationId? trackedOperationId = null,
|
|
Guid? executionId = null,
|
|
string? sourceScript = null)
|
|
{
|
|
var definition = await ResolveConnectionAsync(connectionName, cancellationToken);
|
|
if (definition == null)
|
|
{
|
|
throw new InvalidOperationException($"Database connection '{connectionName}' not found");
|
|
}
|
|
|
|
if (_storeAndForward == null)
|
|
{
|
|
throw new InvalidOperationException("Store-and-forward service not available for cached writes");
|
|
}
|
|
|
|
var payload = JsonSerializer.Serialize(new
|
|
{
|
|
ConnectionName = connectionName,
|
|
Sql = sql,
|
|
Parameters = parameters
|
|
});
|
|
|
|
// ExternalSystemGateway-015: the entity's MaxRetries is a non-nullable int
|
|
// whose default is 0, and the Store-and-Forward engine interprets a stored
|
|
// MaxRetries of 0 as "no limit" (retry forever) — see
|
|
// StoreAndForwardMessage.MaxRetries ("0 = no limit") and the retry-sweep
|
|
// guard `MaxRetries > 0 && ...`. Passing 0 verbatim would turn every
|
|
// unconfigured cached write into an unbounded retry loop. A 0 is treated as
|
|
// "unset" and passed as null so the bounded S&F default applies; the
|
|
// RetryDelay default of TimeSpan.Zero is likewise unset.
|
|
await _storeAndForward.EnqueueAsync(
|
|
StoreAndForwardCategory.CachedDbWrite,
|
|
connectionName,
|
|
payload,
|
|
originInstanceName,
|
|
definition.MaxRetries > 0 ? definition.MaxRetries : null,
|
|
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null,
|
|
// Audit Log #23 (M3): pin the S&F message id to the
|
|
// TrackedOperationId so the retry loop (Bundle E Tasks E4/E5) can
|
|
// read it back via StoreAndForwardMessage.Id and emit per-attempt +
|
|
// terminal cached-write telemetry. Null -> S&F mints its own GUID
|
|
// (legacy pre-M3 behaviour).
|
|
messageId: trackedOperationId?.ToString(),
|
|
// Audit Log #23 (ExecutionId Task 4): thread the originating script
|
|
// execution's ExecutionId + SourceScript onto the buffered row so
|
|
// the retry-loop cached-write audit rows carry the same provenance
|
|
// the script-side cached rows do.
|
|
executionId: executionId,
|
|
sourceScript: sourceScript);
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry
|
|
/// sweep — executes the SQL against the named connection. Returns true on
|
|
/// success, false if the connection no longer exists (the message is parked);
|
|
/// throws on any execution error so the engine retries.
|
|
/// </summary>
|
|
public async Task<bool> DeliverBufferedAsync(
|
|
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
|
{
|
|
var payload = JsonSerializer.Deserialize<CachedWritePayload>(message.PayloadJson);
|
|
if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql))
|
|
{
|
|
_logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id);
|
|
return false;
|
|
}
|
|
|
|
var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken);
|
|
if (definition == null)
|
|
{
|
|
_logger.LogError(
|
|
"Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.",
|
|
payload.ConnectionName);
|
|
return false;
|
|
}
|
|
|
|
await using var connection = new SqlConnection(definition.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
using var command = connection.CreateCommand();
|
|
command.CommandText = payload.Sql;
|
|
if (payload.Parameters != null)
|
|
{
|
|
foreach (var (key, value) in payload.Parameters)
|
|
{
|
|
var parameter = command.CreateParameter();
|
|
parameter.ParameterName = key.StartsWith('@') ? key : "@" + key;
|
|
parameter.Value = JsonElementToParameterValue(value);
|
|
command.Parameters.Add(parameter);
|
|
}
|
|
}
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
return true;
|
|
}
|
|
|
|
private static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch
|
|
{
|
|
JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value,
|
|
JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.GetDouble(),
|
|
JsonValueKind.True => true,
|
|
JsonValueKind.False => false,
|
|
JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value,
|
|
_ => element.GetRawText()
|
|
};
|
|
|
|
private sealed record CachedWritePayload(
|
|
string ConnectionName,
|
|
string Sql,
|
|
Dictionary<string, JsonElement>? Parameters);
|
|
|
|
private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync(
|
|
string connectionName,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
// ExternalSystemGateway-011: name-keyed repository lookup instead of
|
|
// fetch-all-then-filter — connection definitions are resolved on every
|
|
// cached write / connection request, so the repository performs an indexed
|
|
// query rather than loading every connection into memory.
|
|
return await _repository.GetDatabaseConnectionByNameAsync(connectionName, cancellationToken);
|
|
}
|
|
}
|