diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IDatabaseGateway.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IDatabaseGateway.cs index a39a361c..7fc72859 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IDatabaseGateway.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IDatabaseGateway.cs @@ -56,8 +56,17 @@ public interface IDatabaseGateway /// Optional SQL parameters for the statement. /// Optional name of the instance that originated the write. /// Cancellation token for the buffering operation. - /// A task that represents the asynchronous operation. - Task CachedWriteAsync( + /// + /// M2.3 (#7): an mirroring the External-System + /// API path (IExternalSystemClient.CachedCallAsync). The write is + /// attempted immediately: + /// + /// immediate success → Success=true, WasBuffered=false (not buffered); + /// permanent SQL error (constraint / syntax / permission) → Success=false, WasBuffered=false with an error message, returned synchronously and NOT buffered; + /// transient SQL error (connection / timeout / deadlock / throttle) → buffered to store-and-forward, Success=true, WasBuffered=true. + /// + /// + Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, diff --git a/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/DatabaseGateway.cs b/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/DatabaseGateway.cs index f2f266a9..0096fc57 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/DatabaseGateway.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/DatabaseGateway.cs @@ -75,7 +75,7 @@ public class DatabaseGateway : IDatabaseGateway new SqlConnection(connectionString); /// - public async Task CachedWriteAsync( + public async Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, @@ -97,6 +97,44 @@ public class DatabaseGateway : IDatabaseGateway throw new InvalidOperationException("Store-and-forward service not available for cached writes"); } + // M2.3 (#7): attempt the write IMMEDIATELY and classify the outcome, + // mirroring ExternalSystemClient.CachedCallAsync. The pre-M2.3 behaviour + // enqueued every write unconditionally and the S&F retry sweep then + // retried ALL failures forever — a permanent SQL error (constraint, + // syntax, permission) was never returned to the script and spun in the + // buffer indefinitely. Now: + // * success -> Delivered, NOT buffered; + // * PermanentDatabaseException -> Failed synchronously, NOT buffered; + // * TransientDatabaseException -> buffered to S&F for retry. + try + { + await ExecuteWriteAsync( + connectionName, definition.ConnectionString, sql, parameters ?? EmptyParameters, cancellationToken) + .ConfigureAwait(false); + + // Immediate success — the write is done; do not buffer. + return new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: false); + } + catch (PermanentDatabaseException ex) + { + // Permanent failures are returned to the script and never buffered — + // mirrors the PermanentExternalSystemException branch on the API path. + _logger.LogWarning( + ex, + "CachedWrite to '{Connection}' failed permanently (SQL error {Number}); returning Failed without buffering.", + connectionName, ex.SqlErrorNumber); + return new ExternalCallResult( + Success: false, ResponseJson: null, ErrorMessage: $"Permanent database error: {ex.Message}", WasBuffered: false); + } + catch (TransientDatabaseException ex) + { + // Transient failure — hand to S&F so the retry sweep delivers it. + _logger.LogDebug( + ex, + "CachedWrite to '{Connection}' failed transiently (SQL error {Number}); buffering for retry.", + connectionName, ex.SqlErrorNumber); + } + var payload = JsonSerializer.Serialize(new { ConnectionName = connectionName, @@ -119,6 +157,12 @@ public class DatabaseGateway : IDatabaseGateway originInstanceName, definition.MaxRetries > 0 ? definition.MaxRetries : null, definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null, + // M2.3 (#7): attemptImmediateDelivery: false — this method already + // made the write attempt above (the transient-classified failure is + // exactly why we are buffering). Letting EnqueueAsync re-invoke the + // delivery handler would execute the same write a second time — + // mirrors ExternalSystemClient.CachedCallAsync. + attemptImmediateDelivery: false, // Audit Log #23 (M3): pin the S&F message id to the // TrackedOperationId so the retry loop (Bundle E Tasks E4/E5) can // read it back via StoreAndForwardMessage.Id and emit per-attempt + @@ -136,17 +180,29 @@ public class DatabaseGateway : IDatabaseGateway // retry-loop cached-write audit rows correlate back to the // cross-execution chain. Null for a non-routed run. parentExecutionId: parentExecutionId); + + // Buffered for retry — mirrors the API path's WasBuffered=true result. + return new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true); } /// /// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry - /// sweep — executes the SQL against the named connection. Returns true on - /// success, false if the connection no longer exists (the message is parked); - /// throws on any execution error so the engine retries. + /// sweep — executes the SQL against the named connection. /// + /// + /// M2.3 (#7): the outcome is classified, mirroring + /// . Returns + /// false — so the S&F engine PARKS the message — when the + /// connection no longer exists, the payload is unreadable, or the SQL fails + /// with a PERMANENT error (constraint / syntax / permission). A TRANSIENT SQL + /// error () propagates so the engine + /// retries. The pre-M2.3 code rethrew on ANY SQL error, so a permanent + /// failure on the retry path looped forever. + /// /// The buffered store-and-forward message to deliver. /// Cancellation token for the delivery operation. - /// A task that resolves to true on success, or false if the connection no longer exists. + /// A task that resolves to true on success, or false when the message must be parked. + /// Thrown on a transient SQL failure so the engine retries. public async Task DeliverBufferedAsync( StoreAndForwardMessage message, CancellationToken cancellationToken = default) { @@ -185,22 +241,93 @@ public class DatabaseGateway : IDatabaseGateway return false; } - await using var connection = new SqlConnection(definition.ConnectionString); - await connection.OpenAsync(cancellationToken); - using var command = connection.CreateCommand(); - command.CommandText = payload.Sql; - if (payload.Parameters != null) + // Materialise the buffered JsonElement parameters into CLR values once, + // then run through the shared ExecuteWriteAsync seam so both the + // immediate-attempt path and this retry path classify SqlException the + // same way. + IReadOnlyDictionary materialisedParameters = + payload.Parameters == null + ? EmptyParameters + : payload.Parameters.ToDictionary( + kv => kv.Key, kv => (object?)JsonElementToParameterValue(kv.Value)); + + try { - foreach (var (key, value) in payload.Parameters) + await ExecuteWriteAsync( + payload.ConnectionName, definition.ConnectionString, payload.Sql, materialisedParameters, cancellationToken) + .ConfigureAwait(false); + return true; + } + catch (PermanentDatabaseException ex) + { + // Permanent — parking is correct; retrying the identical statement + // cannot succeed. Mirrors ExternalSystemClient.DeliverBufferedAsync + // returning false on PermanentExternalSystemException. + _logger.LogError( + ex, + "Buffered DB write to '{Connection}' failed permanently (SQL error {Number}); parking.", + payload.ConnectionName, ex.SqlErrorNumber); + return false; + } + // TransientDatabaseException propagates — the S&F engine retries. + } + + /// + /// Reusable empty parameter map so the no-parameter paths do not allocate a + /// fresh dictionary each call. + /// + private static readonly IReadOnlyDictionary EmptyParameters = + new Dictionary(); + + /// + /// M2.3 (#7): executes a parameterised SQL write against the given connection + /// string and classifies any into + /// / + /// via . This is the single SQL-execution seam + /// shared by the immediate attempt and the + /// retry path. Marked internal virtual + /// so tests can substitute success / transient / permanent outcomes without a + /// real SQL Server (and without fabricating a , which + /// has no public constructor). Mirrors the role of + /// on the API path. + /// + /// The human-readable connection name, used only for the classified error message (never the connection string — that would leak credentials into logs / script-visible errors). + /// The ADO.NET connection string to write through. + /// The SQL statement to execute. + /// Materialised CLR parameter values (may be empty). + /// Cancellation token for the write. + /// A task that completes when the write succeeds. + /// Thrown for a transient SQL error number. + /// Thrown for a permanent (or unknown) SQL error number. + internal virtual async Task ExecuteWriteAsync( + string connectionName, + string connectionString, + string sql, + IReadOnlyDictionary parameters, + CancellationToken cancellationToken) + { + try + { + await using var connection = new SqlConnection(connectionString); + await connection.OpenAsync(cancellationToken).ConfigureAwait(false); + using var command = connection.CreateCommand(); + command.CommandText = sql; + foreach (var (key, value) in parameters) { var parameter = command.CreateParameter(); parameter.ParameterName = key.StartsWith('@') ? key : "@" + key; - parameter.Value = JsonElementToParameterValue(value); + parameter.Value = value ?? DBNull.Value; command.Parameters.Add(parameter); } + await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + } + catch (SqlException ex) + { + // Classify by SqlException.Number and rethrow as the strongly-typed + // transient / permanent failure the callers branch on. The context + // is the connection NAME, never the connection string. + throw SqlErrorClassifier.Throw(connectionName, ex); } - await command.ExecuteNonQueryAsync(cancellationToken); - return true; } // ExternalSystemGateway-020: a JSON number that does not fit in Int64 must diff --git a/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/SqlErrorClassifier.cs b/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/SqlErrorClassifier.cs new file mode 100644 index 00000000..f1e7a89f --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway/SqlErrorClassifier.cs @@ -0,0 +1,160 @@ +using Microsoft.Data.SqlClient; + +namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway; + +/// +/// M2.3 (#7): classifies a SQL Server failure as transient (a brief wait / +/// retry may succeed — buffer to store-and-forward) or permanent (the identical +/// statement cannot succeed — return to the script / park the buffered message). +/// +/// +/// +/// This is the database-side parallel of (the +/// HTTP path). The two are kept separate because the inputs differ: HTTP keys +/// off status codes / exception types, SQL keys off +/// . +/// +/// +/// Transient set. Only connection-loss, timeout, deadlock, and Azure SQL +/// throttle/availability error numbers are transient — failures whose cause is +/// external to the statement and may clear on its own: +/// +/// -2 — query / command timeout expired. +/// -1 — a connection-level error (general SqlClient connection failure). +/// 2 — SQL Server / network instance not found or not accessible. +/// 53 — network path to the server was not found. +/// 64 — connection terminated mid-session (transport error). +/// 233 — no process on the other end of the named pipe. +/// 1205 — the session was chosen as a deadlock victim. +/// 10053 — transport-level abort (software caused connection abort). +/// 10054 — connection reset by peer. +/// 10060 — connection attempt timed out. +/// 40197 — Azure SQL service error processing the request; retry. +/// 40501 — Azure SQL service is busy. +/// 40613 — Azure SQL database is currently unavailable. +/// 49918 / 49919 / 49920 — Azure SQL throttling (too many requests / operations). +/// +/// +/// +/// Everything else is permanent. Constraint violations (547, 2627, 2601), +/// syntax errors (102, 156, 207, 208), and permission errors (229, 230, 262) are +/// the obvious permanent cases, but the policy is broader: any error number not +/// in the transient set — including unknown / undocumented / ambiguous numbers — +/// is treated as permanent. Fail-fast is the safer default: silently +/// retrying an unrecognised error forever (the pre-M2.3 behaviour) hides +/// authoring bugs and can replay duplicate side effects. A genuinely transient +/// number we have not enumerated will, at worst, surface to the script as a +/// permanent failure — a loud, fixable outcome — rather than spin in an +/// unbounded retry loop. +/// +/// +public static class SqlErrorClassifier +{ + /// + /// The complete set of SQL Server error numbers treated as transient. See the + /// type-level remarks for the per-number rationale. Anything outside this set + /// is permanent. + /// + private static readonly HashSet TransientErrorNumbers = new() + { + -2, -1, 2, 53, 64, 233, 1205, + 10053, 10054, 10060, + 40197, 40501, 40613, + 49918, 49919, 49920, + }; + + /// + /// Determines whether a SQL Server error number represents a transient + /// failure. Unknown / undocumented numbers default to permanent + /// () — see the type-level remarks. + /// + /// The SQL Server error number (e.g. ). + /// if the number is in the transient set; otherwise . + public static bool IsTransient(int errorNumber) => TransientErrorNumbers.Contains(errorNumber); + + /// + /// Determines whether a represents a transient + /// failure by classifying its top-level . + /// + /// The SQL exception to classify. + /// if the exception's error number is transient; otherwise . + public static bool IsTransient(SqlException exception) + { + ArgumentNullException.ThrowIfNull(exception); + return IsTransient(exception.Number); + } + + /// + /// Classifies a and rethrows it as the matching + /// strongly-typed failure: for a + /// transient error number, otherwise. + /// Mirrors + /// + the throw of on the HTTP + /// path — the callers then branch on the typed exception rather than on the + /// raw . + /// + /// A short human-readable description of the failing operation (e.g. the connection name). + /// The SQL exception to classify and wrap. + /// This method never returns normally — it always throws. + /// Thrown when the error number is transient. + /// Thrown when the error number is permanent (the default). + public static Exception Throw(string context, SqlException exception) + { + ArgumentNullException.ThrowIfNull(exception); + + if (IsTransient(exception)) + { + throw new TransientDatabaseException( + $"Transient SQL error {exception.Number} on {context}: {exception.Message}", + exception.Number, + exception); + } + + throw new PermanentDatabaseException( + $"Permanent SQL error {exception.Number} on {context}: {exception.Message}", + exception.Number, + exception); + } +} + +/// +/// Signals a transient database failure suitable for store-and-forward retry — +/// the SQL-path parallel of . +/// +public class TransientDatabaseException : Exception +{ + /// Gets the SQL Server error number that caused the failure, if known. + public int? SqlErrorNumber { get; } + + /// Initializes a new . + /// The error message. + /// The SQL Server error number, if available. + /// Optional inner exception (typically the original ). + public TransientDatabaseException(string message, int? errorNumber = null, Exception? innerException = null) + : base(message, innerException) + { + SqlErrorNumber = errorNumber; + } +} + +/// +/// Signals a permanent database failure that must not be retried — the SQL-path +/// parallel of . Returned +/// synchronously to the calling script on the immediate attempt and parks the +/// message on the store-and-forward retry path. +/// +public class PermanentDatabaseException : Exception +{ + /// Gets the SQL Server error number that caused the failure, if known. + public int? SqlErrorNumber { get; } + + /// Initializes a new . + /// The error message. + /// The SQL Server error number, if available. + /// Optional inner exception (typically the original ). + public PermanentDatabaseException(string message, int? errorNumber = null, Exception? innerException = null) + : base(message, innerException) + { + SqlErrorNumber = errorNumber; + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs index e52672c5..4bf48d21 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -1326,9 +1326,20 @@ public class ScriptRuntimeContext name, trackedId, target, occurredAtUtc, cancellationToken) .ConfigureAwait(false); + // M2.3 (#7): the gateway now attempts the write immediately and + // classifies the outcome (mirroring ExternalSystem.CachedCall). The + // result is retained because the immediate paths (WasBuffered=false — + // immediate success OR a synchronous permanent failure) bypass the + // S&F retry loop entirely, so no retry-loop telemetry ever fires. + // This helper must emit the Attempted + CachedResolve terminal rows + // itself, otherwise Tracking.Status(id) would stay Submitted forever + // and the audit log would be missing the terminal lifecycle. The + // WasBuffered=true path is unaffected — the S&F retry loop owns the + // Attempted + Resolve emissions there. + ExternalCallResult? result; try { - await _gateway.CachedWriteAsync( + result = await _gateway.CachedWriteAsync( name, sql, parameters, _instanceName, cancellationToken, trackedId, // Audit Log #23 (ExecutionId Task 4): thread the script // execution's ExecutionId + SourceScript so a buffered @@ -1350,9 +1361,148 @@ public class ScriptRuntimeContext throw; } + // M2.3 (#7): immediate-completion lifecycle — emit the missing + // Attempted + CachedResolve rows when the underlying write resolved + // without engaging the store-and-forward retry loop (immediate + // success or a synchronous permanent failure). + if (result is { WasBuffered: false }) + { + await EmitImmediateDbTerminalTelemetryAsync( + name, target, trackedId, result, cancellationToken) + .ConfigureAwait(false); + } + return trackedId; } + /// + /// M2.3 (#7): best-effort emission of the immediate-completion lifecycle + /// for a Database.CachedWrite that resolved without the S&F + /// retry loop — emits an Attempted row then a terminal + /// CachedResolve row (Delivered on success, Failed on + /// a synchronous permanent SQL error). The DB parallel of + /// . Any forwarder + /// failure is logged and swallowed (alog.md §7). + /// + private async Task EmitImmediateDbTerminalTelemetryAsync( + string connectionName, + string target, + TrackedOperationId trackedId, + ExternalCallResult result, + CancellationToken cancellationToken) + { + if (_cachedForwarder == null) + { + return; + } + + var occurredAtUtc = DateTime.UtcNow; + + // Status mapping mirrors the API path: success -> Delivered, a + // synchronous permanent failure -> Failed. A transient failure never + // reaches here (WasBuffered=true), so "the immediate attempt failed + // and the operation is done" always means a permanent failure. + var auditTerminalStatus = result.Success ? AuditStatus.Delivered : AuditStatus.Failed; + var operationalTerminalStatus = result.Success ? "Delivered" : "Failed"; + + // --- Attempted row ------------------------------------------------- + CachedCallTelemetry? attempted = TryBuildDbTerminalTelemetry( + connectionName, target, trackedId, occurredAtUtc, + AuditKind.DbWriteCached, AuditStatus.Attempted, "Attempted", + result, isTerminal: false); + + if (attempted is not null) + { + try + { + await _cachedForwarder.ForwardAsync(attempted, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Immediate-Attempted telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})", + connectionName, trackedId); + } + } + + // --- CachedResolve row -------------------------------------------- + CachedCallTelemetry? resolve = TryBuildDbTerminalTelemetry( + connectionName, target, trackedId, occurredAtUtc, + AuditKind.CachedResolve, auditTerminalStatus, operationalTerminalStatus, + result, isTerminal: true); + + if (resolve is not null) + { + try + { + await _cachedForwarder.ForwardAsync(resolve, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Immediate-CachedResolve telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})", + connectionName, trackedId); + } + } + } + + /// + /// Builds one immediate-completion DbOutbound telemetry packet, or + /// returns null (and logs) when construction throws — so a build + /// failure skips emission rather than aborting the script. + /// + private CachedCallTelemetry? TryBuildDbTerminalTelemetry( + string connectionName, + string target, + TrackedOperationId trackedId, + DateTime occurredAtUtc, + AuditKind kind, + AuditStatus auditStatus, + string operationalStatus, + ExternalCallResult result, + bool isTerminal) + { + try + { + return new CachedCallTelemetry( + Audit: ScadaBridgeAuditEventFactory.Create( + channel: AuditChannel.DbOutbound, + kind: kind, + status: auditStatus, + occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), + target: target, + correlationId: trackedId.Value, + executionId: _executionId, + parentExecutionId: _parentExecutionId, + sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId, + sourceInstanceId: _instanceName, + sourceScript: _sourceScript, + errorMessage: result.Success ? null : result.ErrorMessage), + Operational: new SiteCallOperational( + TrackedOperationId: trackedId, + Channel: "DbOutbound", + Target: target, + SourceSite: _siteId, + SourceNode: _sourceNode, + Status: operationalStatus, + RetryCount: 0, + LastError: result.Success ? null : result.ErrorMessage, + HttpStatus: null, + CreatedAtUtc: occurredAtUtc, + UpdatedAtUtc: occurredAtUtc, + TerminalAtUtc: isTerminal ? occurredAtUtc : null)); + } + catch (Exception buildEx) + { + _logger.LogWarning(buildEx, + "Failed to build immediate-{Kind} telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission", + kind, connectionName, trackedId); + return null; + } + } + private async Task EmitCachedDbSubmitTelemetryAsync( string connectionName, TrackedOperationId trackedId, diff --git a/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs index a7388e67..46d18014 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs @@ -100,7 +100,14 @@ public class DatabaseGatewayTests var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService( storage, sfOptions, NullLogger.Instance); - var gateway = new DatabaseGateway(_repository, NullLogger.Instance, storeAndForward: sf); + // M2.3 (#7): CachedWriteAsync now attempts the write immediately and + // only buffers on a TRANSIENT failure. The stub forces a transient + // outcome so this test exercises the buffering path deterministically + // without a real SQL Server. + var gateway = new ExecuteStubGateway( + _repository, + sf, + onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205)); // Audit Log #23 (ExecutionId Task 4): a known execution id / source // script so the gateway -> EnqueueAsync hop can be asserted below. @@ -157,7 +164,11 @@ public class DatabaseGatewayTests var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService( storage, sfOptions, NullLogger.Instance); - var gateway = new DatabaseGateway(_repository, NullLogger.Instance, storeAndForward: sf); + // M2.3 (#7): force a transient outcome so the write reaches S&F. + var gateway = new ExecuteStubGateway( + _repository, + sf, + onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205)); await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"); @@ -167,6 +178,219 @@ public class DatabaseGatewayTests Assert.NotEqual(0, maxRetries); } + // ── M2.3 (#7): transient-vs-permanent SQL classification on the immediate + // cached-write attempt + the buffered retry path ── + + /// + /// Builds a real, initialised in-memory store-and-forward service plus a + /// keep-alive connection (the SQLite shared-cache DB lives only while a + /// connection is open). The caller disposes . + /// + private static (ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService Sf, string ConnStr, Microsoft.Data.Sqlite.SqliteConnection KeepAlive) + NewStoreAndForward() + { + var dbName = $"EsgCachedWriteClassify_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr); + keepAlive.Open(); + var storage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage( + connStr, NullLogger.Instance); + storage.InitializeAsync().GetAwaiter().GetResult(); + var sfOptions = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardOptions + { + DefaultMaxRetries = 99, + DefaultRetryInterval = TimeSpan.FromMinutes(10), + RetryTimerInterval = TimeSpan.FromMinutes(10), + }; + var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService( + storage, sfOptions, NullLogger.Instance); + return (sf, connStr, keepAlive); + } + + [Fact] + public async Task CachedWrite_PermanentSqlError_ReturnsFailedSynchronously_NotBuffered() + { + // A constraint/syntax/permission failure on the IMMEDIATE attempt must + // be returned to the script as Failed and must NOT be buffered — mirrors + // ExternalSystemClient.CachedCallAsync's PermanentExternalSystemException + // path. + var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 }; + StubConnection(conn); + + var (sf, connStr, keepAlive) = NewStoreAndForward(); + using var _ = keepAlive; + + var gateway = new ExecuteStubGateway( + _repository, + sf, + onExecute: () => throw new PermanentDatabaseException( + "Violation of PRIMARY KEY constraint", errorNumber: 2627)); + + var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"); + + Assert.False(result.Success); + Assert.False(result.WasBuffered); + Assert.NotNull(result.ErrorMessage); + + // Nothing buffered — the permanent failure short-circuited S&F. + Assert.Equal(0, ReadBufferDepth(connStr)); + } + + [Fact] + public async Task CachedWrite_TransientSqlError_BuffersToStoreAndForward() + { + // A deadlock / timeout on the IMMEDIATE attempt is transient — the write + // is handed to S&F (WasBuffered=true), not returned as Failed. + var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") + { + Id = 1, + MaxRetries = 5, + RetryDelay = TimeSpan.FromSeconds(12), + }; + StubConnection(conn); + + var (sf, connStr, keepAlive) = NewStoreAndForward(); + using var _ = keepAlive; + + var gateway = new ExecuteStubGateway( + _repository, + sf, + onExecute: () => throw new TransientDatabaseException( + "Transaction was deadlocked", errorNumber: 1205)); + + var result = await gateway.CachedWriteAsync( + "testDb", "UPDATE t SET v = 1", new Dictionary { ["x"] = 1 }); + + Assert.True(result.Success); // accepted for delivery + Assert.True(result.WasBuffered); // handed to S&F, not synchronously failed + Assert.Null(result.ErrorMessage); + + Assert.Equal(1, ReadBufferDepth(connStr)); + } + + [Fact] + public async Task CachedWrite_ImmediateSuccess_NotBuffered_ReturnsDelivered() + { + // A write that succeeds immediately is done — it must NOT be buffered, + // and the result reports success (WasBuffered=false), mirroring the API + // path's immediate-success behaviour. + var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 }; + StubConnection(conn); + + var (sf, connStr, keepAlive) = NewStoreAndForward(); + using var _ = keepAlive; + + var gateway = new ExecuteStubGateway(_repository, sf, onExecute: () => { /* succeeds */ }); + + var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"); + + Assert.True(result.Success); + Assert.False(result.WasBuffered); + Assert.Null(result.ErrorMessage); + + Assert.Equal(0, ReadBufferDepth(connStr)); + } + + [Fact] + public async Task DeliverBuffered_TransientSqlError_RethrowsSoEngineRetries() + { + // On the retry path a transient failure must propagate so the S&F engine + // schedules another retry — mirrors ExternalSystemClient.DeliverBuffered + // letting TransientExternalSystemException escape. + var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 }; + StubConnection(conn); + + var gateway = new ExecuteStubGateway( + _repository, + storeAndForward: null, + onExecute: () => throw new TransientDatabaseException("timeout", errorNumber: -2)); + + var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite, + Target = "testDb", + PayloadJson = + """{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""", + }; + + await Assert.ThrowsAsync( + () => gateway.DeliverBufferedAsync(message)); + } + + [Fact] + public async Task DeliverBuffered_PermanentSqlError_ReturnsFalseSoMessageParks() + { + // On the retry path a permanent failure must park the message (return + // false) rather than retry forever — mirrors ExternalSystemClient. + // DeliverBuffered returning false on PermanentExternalSystemException. + var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 }; + StubConnection(conn); + + var gateway = new ExecuteStubGateway( + _repository, + storeAndForward: null, + onExecute: () => throw new PermanentDatabaseException( + "Invalid column name", errorNumber: 207)); + + var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite, + Target = "testDb", + PayloadJson = + """{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""", + }; + + var delivered = await gateway.DeliverBufferedAsync(message); + + Assert.False(delivered); // permanent — the S&F engine parks the message + } + + /// + /// Reads the current buffered-message count off the S&F SQLite DB by + /// counting sf_messages rows (the engine's persistence table). + /// + private static int ReadBufferDepth(string connStr) + { + using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM sf_messages"; + return Convert.ToInt32(cmd.ExecuteScalar()); + } + + /// + /// Test gateway that substitutes the SQL-execution seam so a test can drive + /// success / transient / permanent outcomes without a real SQL Server (and + /// without fabricating a , + /// which has no public constructor). Production classifies a real + /// SqlException into / + /// at this same seam. + /// + private sealed class ExecuteStubGateway : DatabaseGateway + { + private readonly Action _onExecute; + + public ExecuteStubGateway( + IExternalSystemRepository repository, + ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService? storeAndForward, + Action onExecute) + : base(repository, NullLogger.Instance, storeAndForward) + => _onExecute = onExecute; + + internal override Task ExecuteWriteAsync( + string connectionName, + string connectionString, + string sql, + IReadOnlyDictionary parameters, + CancellationToken cancellationToken) + { + _onExecute(); + return Task.CompletedTask; + } + } + private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript) ReadBufferedRetrySettings(string connStr) { diff --git a/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/SqlErrorClassifierTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/SqlErrorClassifierTests.cs new file mode 100644 index 00000000..406013d6 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/SqlErrorClassifierTests.cs @@ -0,0 +1,65 @@ +namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests; + +/// +/// M2.3 (#7): unit tests for the transient-vs-permanent SQL error-number +/// classifier that DatabaseGateway uses to decide whether a failed +/// cached write should be buffered (transient) or returned to the script +/// synchronously / parked (permanent). +/// +public class SqlErrorClassifierTests +{ + // The full transient set documented on SqlErrorClassifier — connection, + // timeout, deadlock, and Azure throttle error numbers. A retry can plausibly + // succeed for any of these, so they are buffered to store-and-forward. + [Theory] + [InlineData(-2)] // timeout expired + [InlineData(-1)] // connection error + [InlineData(2)] // network / instance not found + [InlineData(53)] // network path not found + [InlineData(64)] // connection terminated mid-session + [InlineData(233)] // no process on the other end of the pipe + [InlineData(1205)] // deadlock victim + [InlineData(10053)] // transport-level abort + [InlineData(10054)] // connection reset by peer + [InlineData(10060)] // connection timed out + [InlineData(40197)] // Azure SQL service error, retry + [InlineData(40501)] // Azure SQL service busy + [InlineData(40613)] // Azure SQL database unavailable + [InlineData(49918)] // Azure SQL cannot process request (throttle) + [InlineData(49919)] // Azure SQL too many create/update operations + [InlineData(49920)] // Azure SQL too many operations (throttle) + public void IsTransient_KnownTransientNumber_ReturnsTrue(int errorNumber) + { + Assert.True(SqlErrorClassifier.IsTransient(errorNumber)); + } + + // Constraint, syntax, and permission errors are permanent — retrying the + // identical statement cannot succeed and may cause duplicate side effects. + [Theory] + [InlineData(547)] // constraint violation (FK/CHECK) + [InlineData(2627)] // primary-key / unique constraint violation + [InlineData(2601)] // duplicate key in a unique index + [InlineData(102)] // incorrect syntax + [InlineData(156)] // incorrect syntax near a keyword + [InlineData(207)] // invalid column name + [InlineData(208)] // invalid object name + [InlineData(229)] // permission denied on object + [InlineData(230)] // permission denied on column + [InlineData(262)] // permission denied (CREATE etc.) + public void IsTransient_KnownPermanentNumber_ReturnsFalse(int errorNumber) + { + Assert.False(SqlErrorClassifier.IsTransient(errorNumber)); + } + + [Theory] + [InlineData(0)] // no error number captured + [InlineData(99999)] // unknown / undocumented number + [InlineData(12345)] + [InlineData(int.MaxValue)] + public void IsTransient_UnknownNumber_DefaultsToPermanent(int errorNumber) + { + // Fail-fast is the safer default: an unrecognised error number must NOT + // be silently retried forever. Unknown => permanent => false. + Assert.False(SqlErrorClassifier.IsTransient(errorNumber)); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs index ba62aee5..e0c8f8b6 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs @@ -77,7 +77,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -118,7 +123,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -147,7 +157,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId); @@ -169,7 +184,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -207,7 +227,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -248,7 +273,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId); @@ -281,7 +311,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -310,7 +345,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder { ThrowOnForward = new InvalidOperationException("simulated forwarder outage"), @@ -348,7 +388,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = new ScriptRuntimeContext.DatabaseHelper( @@ -384,7 +429,12 @@ public class DatabaseCachedWriteEmissionTests It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The + // buffered result (WasBuffered=true) models the transient-failure + // path these enqueue-telemetry tests exercise — only the CachedSubmit + // packet is emitted; the S&F retry loop (not the helper) owns the + // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. + .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); @@ -393,4 +443,97 @@ public class DatabaseCachedWriteEmissionTests var packet = Assert.Single(forwarder.Telemetry); Assert.Null(packet.Operational.SourceNode); } + + // ── M2.3 (#7): immediate-completion lifecycle (WasBuffered=false) ── + + private static Mock GatewayReturning(ExternalCallResult result) + { + var gateway = new Mock(); + gateway + .Setup(g => g.CachedWriteAsync( + It.IsAny(), It.IsAny(), + It.IsAny?>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(result); + return gateway; + } + + [Fact] + public async Task CachedWrite_ImmediateSuccess_EmitsSubmitAttemptedThenDeliveredResolve() + { + // An immediate success (WasBuffered=false) bypasses the S&F retry loop, + // so the helper itself must emit the Attempted + terminal CachedResolve + // rows — mirroring ExternalSystem.CachedCall's immediate-success path. + var gateway = GatewayReturning( + new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: false)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(gateway.Object, forwarder); + var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); + + Assert.Equal(3, forwarder.Telemetry.Count); + + var submit = forwarder.Telemetry[0].Audit.AsRow(); + Assert.Equal(AuditKind.CachedSubmit, submit.Kind); + Assert.Equal(AuditStatus.Submitted, submit.Status); + + var attempted = forwarder.Telemetry[1].Audit.AsRow(); + Assert.Equal(AuditChannel.DbOutbound, attempted.Channel); + Assert.Equal(AuditKind.DbWriteCached, attempted.Kind); + Assert.Equal(AuditStatus.Attempted, attempted.Status); + + var resolve = forwarder.Telemetry[2]; + Assert.Equal(AuditChannel.DbOutbound, resolve.Audit.AsRow().Channel); + Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind); + Assert.Equal(AuditStatus.Delivered, resolve.Audit.AsRow().Status); + Assert.Equal(trackedId.Value, resolve.Audit.AsRow().CorrelationId); + Assert.Equal("Delivered", resolve.Operational.Status); + Assert.NotNull(resolve.Operational.TerminalAtUtc); + } + + [Fact] + public async Task CachedWrite_ImmediatePermanentFailure_EmitsSubmitAttemptedThenFailedResolve() + { + // A synchronous permanent SQL failure (Success=false, WasBuffered=false) + // also bypasses S&F; the terminal CachedResolve must be Failed and the + // error message must propagate onto the row. + const string error = "Permanent database error: Permanent SQL error 2627 on myDb: ..."; + var gateway = GatewayReturning( + new ExternalCallResult(Success: false, ResponseJson: null, ErrorMessage: error, WasBuffered: false)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(gateway.Object, forwarder); + var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); + + Assert.Equal(3, forwarder.Telemetry.Count); + + var resolve = forwarder.Telemetry[2]; + Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind); + Assert.Equal(AuditStatus.Failed, resolve.Audit.AsRow().Status); + Assert.Equal(error, resolve.Audit.AsRow().ErrorMessage); + Assert.Equal("Failed", resolve.Operational.Status); + Assert.Equal(error, resolve.Operational.LastError); + Assert.NotNull(resolve.Operational.TerminalAtUtc); + Assert.NotEqual(default, trackedId); + } + + [Fact] + public async Task CachedWrite_BufferedTransient_EmitsOnlySubmit_NoTerminalRows() + { + // The WasBuffered=true path must NOT emit Attempted / CachedResolve — the + // S&F retry loop owns those. Only the CachedSubmit row is emitted by the + // helper. + var gateway = GatewayReturning( + new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(gateway.Object, forwarder); + await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); + + var packet = Assert.Single(forwarder.Telemetry); + Assert.Equal(AuditKind.CachedSubmit, packet.Audit.AsRow().Kind); + } }