using System.Data.Common; using System.Text.Json; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Entities.ExternalSystems; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.StoreAndForward; namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway; /// /// WP-9: Database access from scripts. /// Database.Connection("name") — returns ADO.NET SqlConnection (connection pooling). /// Database.CachedWrite("name", "sql", params) — submits to S&F engine. /// public class DatabaseGateway : IDatabaseGateway { private readonly IExternalSystemRepository _repository; private readonly StoreAndForwardService? _storeAndForward; private readonly ILogger _logger; /// /// Initializes a new instance of . /// /// Repository for resolving database connection definitions. /// Logger for diagnostics. /// Optional store-and-forward service for cached writes; null disables buffering. public DatabaseGateway( IExternalSystemRepository repository, ILogger logger, StoreAndForwardService? storeAndForward = null) { _repository = repository; _logger = logger; _storeAndForward = storeAndForward; } /// public async Task GetConnectionAsync( string connectionName, CancellationToken cancellationToken = default) { var definition = await ResolveConnectionAsync(connectionName, cancellationToken); if (definition == null) { throw new InvalidOperationException($"Database connection '{connectionName}' not found"); } var connection = CreateConnection(definition.ConnectionString); try { await connection.OpenAsync(cancellationToken); } catch { // OpenAsync failed (unreachable server, bad credentials, cancellation) — // dispose the just-created connection before the exception propagates so // it is not leaked (ExternalSystemGateway-010). await connection.DisposeAsync(); throw; } return connection; } /// /// Creates the underlying ADO.NET connection for a connection string. Virtual so /// tests can substitute a connection whose OpenAsync fails. /// /// The ADO.NET connection string. /// A new for the given connection string. internal virtual DbConnection CreateConnection(string connectionString) => new SqlConnection(connectionString); /// public async Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, CancellationToken cancellationToken = default, TrackedOperationId? trackedOperationId = null, Guid? executionId = null, string? sourceScript = null, Guid? parentExecutionId = null) { var definition = await ResolveConnectionAsync(connectionName, cancellationToken); if (definition == null) { throw new InvalidOperationException($"Database connection '{connectionName}' not found"); } if (_storeAndForward == null) { throw new InvalidOperationException("Store-and-forward service not available for cached writes"); } var payload = JsonSerializer.Serialize(new { ConnectionName = connectionName, Sql = sql, Parameters = parameters }); // ExternalSystemGateway-015: the entity's MaxRetries is a non-nullable int // whose default is 0, and the Store-and-Forward engine interprets a stored // MaxRetries of 0 as "no limit" (retry forever) — see // StoreAndForwardMessage.MaxRetries ("0 = no limit") and the retry-sweep // guard `MaxRetries > 0 && ...`. Passing 0 verbatim would turn every // unconfigured cached write into an unbounded retry loop. A 0 is treated as // "unset" and passed as null so the bounded S&F default applies; the // RetryDelay default of TimeSpan.Zero is likewise unset. await _storeAndForward.EnqueueAsync( StoreAndForwardCategory.CachedDbWrite, connectionName, payload, originInstanceName, definition.MaxRetries > 0 ? definition.MaxRetries : null, definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null, // Audit Log #23 (M3): pin the S&F message id to the // TrackedOperationId so the retry loop (Bundle E Tasks E4/E5) can // read it back via StoreAndForwardMessage.Id and emit per-attempt + // terminal cached-write telemetry. Null -> S&F mints its own GUID // (legacy pre-M3 behaviour). messageId: trackedOperationId?.ToString(), // Audit Log #23 (ExecutionId Task 4): thread the originating script // execution's ExecutionId + SourceScript onto the buffered row so // the retry-loop cached-write audit rows carry the same provenance // the script-side cached rows do. executionId: executionId, sourceScript: sourceScript, // Audit Log #23 (ParentExecutionId Task 6): thread the spawning // inbound-API request's ExecutionId onto the buffered row so the // retry-loop cached-write audit rows correlate back to the // cross-execution chain. Null for a non-routed run. parentExecutionId: parentExecutionId); } /// /// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry /// sweep — executes the SQL against the named connection. Returns true on /// success, false if the connection no longer exists (the message is parked); /// throws on any execution error so the engine retries. /// /// The buffered store-and-forward message to deliver. /// Cancellation token for the delivery operation. /// A task that resolves to true on success, or false if the connection no longer exists. public async Task DeliverBufferedAsync( StoreAndForwardMessage message, CancellationToken cancellationToken = default) { // ExternalSystemGateway-018: a malformed (not just empty/null-fielded) // PayloadJson would otherwise throw `JsonException` here, which the S&F // engine treats as a transient failure and retries forever (poison // message). Re-running the same deserialization against the same payload // will throw deterministically, so JsonException is permanent — log, // and return false so the S&F engine parks the message instead. CachedWritePayload? payload; try { payload = JsonSerializer.Deserialize(message.PayloadJson); } catch (JsonException ex) { _logger.LogError( ex, "Buffered CachedDbWrite message {Id} has malformed JSON payload; parking.", message.Id); return false; } if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql)) { _logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id); return false; } var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken); if (definition == null) { _logger.LogError( "Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.", payload.ConnectionName); return false; } await using var connection = new SqlConnection(definition.ConnectionString); await connection.OpenAsync(cancellationToken); using var command = connection.CreateCommand(); command.CommandText = payload.Sql; if (payload.Parameters != null) { foreach (var (key, value) in payload.Parameters) { var parameter = command.CreateParameter(); parameter.ParameterName = key.StartsWith('@') ? key : "@" + key; parameter.Value = JsonElementToParameterValue(value); command.Parameters.Add(parameter); } } await command.ExecuteNonQueryAsync(cancellationToken); return true; } // ExternalSystemGateway-020: a JSON number that does not fit in Int64 must // prefer decimal over double — a script's decimal SQL parameter is // serialised as JSON without a type tag, and downcasting it to double on // the cached-write retry path silently loses precision (e.g. // 1234567890.1234567890 -> 1234567890.1234567 as a binary float). Probe // long first (whole-number fast path), then decimal (preserves authored // precision for typical money/measurement values), and only fall through // to double for genuinely out-of-decimal-range values (very large // scientific-notation floats). /// /// Converts a to the most appropriate SQL parameter value, /// preferring longdecimaldouble for numeric kinds to preserve precision. /// /// The JSON element to convert. /// A boxed CLR value suitable for use as an ADO.NET parameter, or for null/undefined. internal static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch { JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value, JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.TryGetDecimal(out var dec) ? dec : element.GetDouble(), JsonValueKind.True => true, JsonValueKind.False => false, JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value, _ => element.GetRawText() }; private sealed record CachedWritePayload( string ConnectionName, string Sql, Dictionary? Parameters); private async Task ResolveConnectionAsync( string connectionName, CancellationToken cancellationToken) { // ExternalSystemGateway-011: name-keyed repository lookup instead of // fetch-all-then-filter — connection definitions are resolved on every // cached write / connection request, so the repository performs an indexed // query rather than loading every connection into memory. return await _repository.GetDatabaseConnectionByNameAsync(connectionName, cancellationToken); } }