fix(db): classify transient vs permanent SQL errors in Database.CachedWrite (#7)
CachedWrite buffered ALL write failures and retried forever, never returning a synchronous failure to the script — permanent SQL errors (constraint/syntax/ permission) were treated as transient. Mirror the External-System API path: attempt immediately, return Failed synchronously on permanent SQL errors (no buffering), buffer only transient errors; the S&F retry path parks permanent failures instead of retrying forever. New SqlErrorClassifier + PermanentDatabaseException.
This commit is contained in:
@@ -56,8 +56,17 @@ public interface IDatabaseGateway
|
||||
/// <param name="parameters">Optional SQL parameters for the statement.</param>
|
||||
/// <param name="originInstanceName">Optional name of the instance that originated the write.</param>
|
||||
/// <param name="cancellationToken">Cancellation token for the buffering operation.</param>
|
||||
/// <returns>A task that represents the asynchronous operation.</returns>
|
||||
Task CachedWriteAsync(
|
||||
/// <returns>
|
||||
/// M2.3 (#7): an <see cref="ExternalCallResult"/> mirroring the External-System
|
||||
/// API path (<c>IExternalSystemClient.CachedCallAsync</c>). The write is
|
||||
/// attempted immediately:
|
||||
/// <list type="bullet">
|
||||
/// <item>immediate success → <c>Success=true, WasBuffered=false</c> (not buffered);</item>
|
||||
/// <item>permanent SQL error (constraint / syntax / permission) → <c>Success=false, WasBuffered=false</c> with an error message, returned synchronously and NOT buffered;</item>
|
||||
/// <item>transient SQL error (connection / timeout / deadlock / throttle) → buffered to store-and-forward, <c>Success=true, WasBuffered=true</c>.</item>
|
||||
/// </list>
|
||||
/// </returns>
|
||||
Task<ExternalCallResult> CachedWriteAsync(
|
||||
string connectionName,
|
||||
string sql,
|
||||
IReadOnlyDictionary<string, object?>? parameters = null,
|
||||
|
||||
@@ -75,7 +75,7 @@ public class DatabaseGateway : IDatabaseGateway
|
||||
new SqlConnection(connectionString);
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task CachedWriteAsync(
|
||||
public async Task<ExternalCallResult> CachedWriteAsync(
|
||||
string connectionName,
|
||||
string sql,
|
||||
IReadOnlyDictionary<string, object?>? parameters = null,
|
||||
@@ -97,6 +97,44 @@ public class DatabaseGateway : IDatabaseGateway
|
||||
throw new InvalidOperationException("Store-and-forward service not available for cached writes");
|
||||
}
|
||||
|
||||
// M2.3 (#7): attempt the write IMMEDIATELY and classify the outcome,
|
||||
// mirroring ExternalSystemClient.CachedCallAsync. The pre-M2.3 behaviour
|
||||
// enqueued every write unconditionally and the S&F retry sweep then
|
||||
// retried ALL failures forever — a permanent SQL error (constraint,
|
||||
// syntax, permission) was never returned to the script and spun in the
|
||||
// buffer indefinitely. Now:
|
||||
// * success -> Delivered, NOT buffered;
|
||||
// * PermanentDatabaseException -> Failed synchronously, NOT buffered;
|
||||
// * TransientDatabaseException -> buffered to S&F for retry.
|
||||
try
|
||||
{
|
||||
await ExecuteWriteAsync(
|
||||
connectionName, definition.ConnectionString, sql, parameters ?? EmptyParameters, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
// Immediate success — the write is done; do not buffer.
|
||||
return new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: false);
|
||||
}
|
||||
catch (PermanentDatabaseException ex)
|
||||
{
|
||||
// Permanent failures are returned to the script and never buffered —
|
||||
// mirrors the PermanentExternalSystemException branch on the API path.
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"CachedWrite to '{Connection}' failed permanently (SQL error {Number}); returning Failed without buffering.",
|
||||
connectionName, ex.SqlErrorNumber);
|
||||
return new ExternalCallResult(
|
||||
Success: false, ResponseJson: null, ErrorMessage: $"Permanent database error: {ex.Message}", WasBuffered: false);
|
||||
}
|
||||
catch (TransientDatabaseException ex)
|
||||
{
|
||||
// Transient failure — hand to S&F so the retry sweep delivers it.
|
||||
_logger.LogDebug(
|
||||
ex,
|
||||
"CachedWrite to '{Connection}' failed transiently (SQL error {Number}); buffering for retry.",
|
||||
connectionName, ex.SqlErrorNumber);
|
||||
}
|
||||
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
{
|
||||
ConnectionName = connectionName,
|
||||
@@ -119,6 +157,12 @@ public class DatabaseGateway : IDatabaseGateway
|
||||
originInstanceName,
|
||||
definition.MaxRetries > 0 ? definition.MaxRetries : null,
|
||||
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null,
|
||||
// M2.3 (#7): attemptImmediateDelivery: false — this method already
|
||||
// made the write attempt above (the transient-classified failure is
|
||||
// exactly why we are buffering). Letting EnqueueAsync re-invoke the
|
||||
// delivery handler would execute the same write a second time —
|
||||
// mirrors ExternalSystemClient.CachedCallAsync.
|
||||
attemptImmediateDelivery: false,
|
||||
// Audit Log #23 (M3): pin the S&F message id to the
|
||||
// TrackedOperationId so the retry loop (Bundle E Tasks E4/E5) can
|
||||
// read it back via StoreAndForwardMessage.Id and emit per-attempt +
|
||||
@@ -136,17 +180,29 @@ public class DatabaseGateway : IDatabaseGateway
|
||||
// retry-loop cached-write audit rows correlate back to the
|
||||
// cross-execution chain. Null for a non-routed run.
|
||||
parentExecutionId: parentExecutionId);
|
||||
|
||||
// Buffered for retry — mirrors the API path's WasBuffered=true result.
|
||||
return new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry
|
||||
/// sweep — executes the SQL against the named connection. Returns true on
|
||||
/// success, false if the connection no longer exists (the message is parked);
|
||||
/// throws on any execution error so the engine retries.
|
||||
/// sweep — executes the SQL against the named connection.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// M2.3 (#7): the outcome is classified, mirroring
|
||||
/// <see cref="ExternalSystemClient.DeliverBufferedAsync"/>. Returns
|
||||
/// <c>false</c> — so the S&F engine PARKS the message — when the
|
||||
/// connection no longer exists, the payload is unreadable, or the SQL fails
|
||||
/// with a PERMANENT error (constraint / syntax / permission). A TRANSIENT SQL
|
||||
/// error (<see cref="TransientDatabaseException"/>) propagates so the engine
|
||||
/// retries. The pre-M2.3 code rethrew on ANY SQL error, so a permanent
|
||||
/// failure on the retry path looped forever.
|
||||
/// </remarks>
|
||||
/// <param name="message">The buffered store-and-forward message to deliver.</param>
|
||||
/// <param name="cancellationToken">Cancellation token for the delivery operation.</param>
|
||||
/// <returns>A task that resolves to <c>true</c> on success, or <c>false</c> if the connection no longer exists.</returns>
|
||||
/// <returns>A task that resolves to <c>true</c> on success, or <c>false</c> when the message must be parked.</returns>
|
||||
/// <exception cref="TransientDatabaseException">Thrown on a transient SQL failure so the engine retries.</exception>
|
||||
public async Task<bool> DeliverBufferedAsync(
|
||||
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
@@ -185,22 +241,93 @@ public class DatabaseGateway : IDatabaseGateway
|
||||
return false;
|
||||
}
|
||||
|
||||
await using var connection = new SqlConnection(definition.ConnectionString);
|
||||
await connection.OpenAsync(cancellationToken);
|
||||
using var command = connection.CreateCommand();
|
||||
command.CommandText = payload.Sql;
|
||||
if (payload.Parameters != null)
|
||||
// Materialise the buffered JsonElement parameters into CLR values once,
|
||||
// then run through the shared ExecuteWriteAsync seam so both the
|
||||
// immediate-attempt path and this retry path classify SqlException the
|
||||
// same way.
|
||||
IReadOnlyDictionary<string, object?> materialisedParameters =
|
||||
payload.Parameters == null
|
||||
? EmptyParameters
|
||||
: payload.Parameters.ToDictionary(
|
||||
kv => kv.Key, kv => (object?)JsonElementToParameterValue(kv.Value));
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var (key, value) in payload.Parameters)
|
||||
await ExecuteWriteAsync(
|
||||
payload.ConnectionName, definition.ConnectionString, payload.Sql, materialisedParameters, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return true;
|
||||
}
|
||||
catch (PermanentDatabaseException ex)
|
||||
{
|
||||
// Permanent — parking is correct; retrying the identical statement
|
||||
// cannot succeed. Mirrors ExternalSystemClient.DeliverBufferedAsync
|
||||
// returning false on PermanentExternalSystemException.
|
||||
_logger.LogError(
|
||||
ex,
|
||||
"Buffered DB write to '{Connection}' failed permanently (SQL error {Number}); parking.",
|
||||
payload.ConnectionName, ex.SqlErrorNumber);
|
||||
return false;
|
||||
}
|
||||
// TransientDatabaseException propagates — the S&F engine retries.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reusable empty parameter map so the no-parameter paths do not allocate a
|
||||
/// fresh dictionary each call.
|
||||
/// </summary>
|
||||
private static readonly IReadOnlyDictionary<string, object?> EmptyParameters =
|
||||
new Dictionary<string, object?>();
|
||||
|
||||
/// <summary>
|
||||
/// M2.3 (#7): executes a parameterised SQL write against the given connection
|
||||
/// string and classifies any <see cref="SqlException"/> into
|
||||
/// <see cref="TransientDatabaseException"/> / <see cref="PermanentDatabaseException"/>
|
||||
/// via <see cref="SqlErrorClassifier"/>. This is the single SQL-execution seam
|
||||
/// shared by the immediate <see cref="CachedWriteAsync"/> attempt and the
|
||||
/// <see cref="DeliverBufferedAsync"/> retry path. Marked <c>internal virtual</c>
|
||||
/// so tests can substitute success / transient / permanent outcomes without a
|
||||
/// real SQL Server (and without fabricating a <see cref="SqlException"/>, which
|
||||
/// has no public constructor). Mirrors the role of
|
||||
/// <see cref="ExternalSystemClient.InvokeHttpAsync"/> on the API path.
|
||||
/// </summary>
|
||||
/// <param name="connectionName">The human-readable connection name, used only for the classified error message (never the connection string — that would leak credentials into logs / script-visible errors).</param>
|
||||
/// <param name="connectionString">The ADO.NET connection string to write through.</param>
|
||||
/// <param name="sql">The SQL statement to execute.</param>
|
||||
/// <param name="parameters">Materialised CLR parameter values (may be empty).</param>
|
||||
/// <param name="cancellationToken">Cancellation token for the write.</param>
|
||||
/// <returns>A task that completes when the write succeeds.</returns>
|
||||
/// <exception cref="TransientDatabaseException">Thrown for a transient SQL error number.</exception>
|
||||
/// <exception cref="PermanentDatabaseException">Thrown for a permanent (or unknown) SQL error number.</exception>
|
||||
internal virtual async Task ExecuteWriteAsync(
|
||||
string connectionName,
|
||||
string connectionString,
|
||||
string sql,
|
||||
IReadOnlyDictionary<string, object?> parameters,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var connection = new SqlConnection(connectionString);
|
||||
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||
using var command = connection.CreateCommand();
|
||||
command.CommandText = sql;
|
||||
foreach (var (key, value) in parameters)
|
||||
{
|
||||
var parameter = command.CreateParameter();
|
||||
parameter.ParameterName = key.StartsWith('@') ? key : "@" + key;
|
||||
parameter.Value = JsonElementToParameterValue(value);
|
||||
parameter.Value = value ?? DBNull.Value;
|
||||
command.Parameters.Add(parameter);
|
||||
}
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (SqlException ex)
|
||||
{
|
||||
// Classify by SqlException.Number and rethrow as the strongly-typed
|
||||
// transient / permanent failure the callers branch on. The context
|
||||
// is the connection NAME, never the connection string.
|
||||
throw SqlErrorClassifier.Throw(connectionName, ex);
|
||||
}
|
||||
await command.ExecuteNonQueryAsync(cancellationToken);
|
||||
return true;
|
||||
}
|
||||
|
||||
// ExternalSystemGateway-020: a JSON number that does not fit in Int64 must
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
using Microsoft.Data.SqlClient;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
|
||||
|
||||
/// <summary>
|
||||
/// M2.3 (#7): classifies a SQL Server failure as transient (a brief wait /
|
||||
/// retry may succeed — buffer to store-and-forward) or permanent (the identical
|
||||
/// statement cannot succeed — return to the script / park the buffered message).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This is the database-side parallel of <see cref="ErrorClassifier"/> (the
|
||||
/// HTTP path). The two are kept separate because the inputs differ: HTTP keys
|
||||
/// off status codes / exception types, SQL keys off
|
||||
/// <see cref="SqlException.Number"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Transient set.</b> Only connection-loss, timeout, deadlock, and Azure SQL
|
||||
/// throttle/availability error numbers are transient — failures whose cause is
|
||||
/// external to the statement and may clear on its own:
|
||||
/// <list type="bullet">
|
||||
/// <item><c>-2</c> — query / command timeout expired.</item>
|
||||
/// <item><c>-1</c> — a connection-level error (general SqlClient connection failure).</item>
|
||||
/// <item><c>2</c> — SQL Server / network instance not found or not accessible.</item>
|
||||
/// <item><c>53</c> — network path to the server was not found.</item>
|
||||
/// <item><c>64</c> — connection terminated mid-session (transport error).</item>
|
||||
/// <item><c>233</c> — no process on the other end of the named pipe.</item>
|
||||
/// <item><c>1205</c> — the session was chosen as a deadlock victim.</item>
|
||||
/// <item><c>10053</c> — transport-level abort (software caused connection abort).</item>
|
||||
/// <item><c>10054</c> — connection reset by peer.</item>
|
||||
/// <item><c>10060</c> — connection attempt timed out.</item>
|
||||
/// <item><c>40197</c> — Azure SQL service error processing the request; retry.</item>
|
||||
/// <item><c>40501</c> — Azure SQL service is busy.</item>
|
||||
/// <item><c>40613</c> — Azure SQL database is currently unavailable.</item>
|
||||
/// <item><c>49918</c> / <c>49919</c> / <c>49920</c> — Azure SQL throttling (too many requests / operations).</item>
|
||||
/// </list>
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Everything else is permanent.</b> Constraint violations (547, 2627, 2601),
|
||||
/// syntax errors (102, 156, 207, 208), and permission errors (229, 230, 262) are
|
||||
/// the obvious permanent cases, but the policy is broader: <b>any error number not
|
||||
/// in the transient set — including unknown / undocumented / ambiguous numbers —
|
||||
/// is treated as permanent.</b> Fail-fast is the safer default: silently
|
||||
/// retrying an unrecognised error forever (the pre-M2.3 behaviour) hides
|
||||
/// authoring bugs and can replay duplicate side effects. A genuinely transient
|
||||
/// number we have not enumerated will, at worst, surface to the script as a
|
||||
/// permanent failure — a loud, fixable outcome — rather than spin in an
|
||||
/// unbounded retry loop.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class SqlErrorClassifier
|
||||
{
|
||||
/// <summary>
|
||||
/// The complete set of SQL Server error numbers treated as transient. See the
|
||||
/// type-level remarks for the per-number rationale. Anything outside this set
|
||||
/// is permanent.
|
||||
/// </summary>
|
||||
private static readonly HashSet<int> TransientErrorNumbers = new()
|
||||
{
|
||||
-2, -1, 2, 53, 64, 233, 1205,
|
||||
10053, 10054, 10060,
|
||||
40197, 40501, 40613,
|
||||
49918, 49919, 49920,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Determines whether a SQL Server error number represents a transient
|
||||
/// failure. Unknown / undocumented numbers default to permanent
|
||||
/// (<see langword="false"/>) — see the type-level remarks.
|
||||
/// </summary>
|
||||
/// <param name="errorNumber">The SQL Server error number (e.g. <see cref="SqlException.Number"/>).</param>
|
||||
/// <returns><see langword="true"/> if the number is in the transient set; otherwise <see langword="false"/>.</returns>
|
||||
public static bool IsTransient(int errorNumber) => TransientErrorNumbers.Contains(errorNumber);
|
||||
|
||||
/// <summary>
|
||||
/// Determines whether a <see cref="SqlException"/> represents a transient
|
||||
/// failure by classifying its top-level <see cref="SqlException.Number"/>.
|
||||
/// </summary>
|
||||
/// <param name="exception">The SQL exception to classify.</param>
|
||||
/// <returns><see langword="true"/> if the exception's error number is transient; otherwise <see langword="false"/>.</returns>
|
||||
public static bool IsTransient(SqlException exception)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(exception);
|
||||
return IsTransient(exception.Number);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Classifies a <see cref="SqlException"/> and rethrows it as the matching
|
||||
/// strongly-typed failure: <see cref="TransientDatabaseException"/> for a
|
||||
/// transient error number, <see cref="PermanentDatabaseException"/> otherwise.
|
||||
/// Mirrors <see cref="ErrorClassifier.AsTransient(string, System.Exception?)"/>
|
||||
/// + the throw of <see cref="PermanentExternalSystemException"/> on the HTTP
|
||||
/// path — the callers then branch on the typed exception rather than on the
|
||||
/// raw <see cref="SqlException"/>.
|
||||
/// </summary>
|
||||
/// <param name="context">A short human-readable description of the failing operation (e.g. the connection name).</param>
|
||||
/// <param name="exception">The SQL exception to classify and wrap.</param>
|
||||
/// <returns>This method never returns normally — it always throws.</returns>
|
||||
/// <exception cref="TransientDatabaseException">Thrown when the error number is transient.</exception>
|
||||
/// <exception cref="PermanentDatabaseException">Thrown when the error number is permanent (the default).</exception>
|
||||
public static Exception Throw(string context, SqlException exception)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(exception);
|
||||
|
||||
if (IsTransient(exception))
|
||||
{
|
||||
throw new TransientDatabaseException(
|
||||
$"Transient SQL error {exception.Number} on {context}: {exception.Message}",
|
||||
exception.Number,
|
||||
exception);
|
||||
}
|
||||
|
||||
throw new PermanentDatabaseException(
|
||||
$"Permanent SQL error {exception.Number} on {context}: {exception.Message}",
|
||||
exception.Number,
|
||||
exception);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signals a transient database failure suitable for store-and-forward retry —
|
||||
/// the SQL-path parallel of <see cref="TransientExternalSystemException"/>.
|
||||
/// </summary>
|
||||
public class TransientDatabaseException : Exception
|
||||
{
|
||||
/// <summary>Gets the SQL Server error number that caused the failure, if known.</summary>
|
||||
public int? SqlErrorNumber { get; }
|
||||
|
||||
/// <summary>Initializes a new <see cref="TransientDatabaseException"/>.</summary>
|
||||
/// <param name="message">The error message.</param>
|
||||
/// <param name="errorNumber">The SQL Server error number, if available.</param>
|
||||
/// <param name="innerException">Optional inner exception (typically the original <see cref="SqlException"/>).</param>
|
||||
public TransientDatabaseException(string message, int? errorNumber = null, Exception? innerException = null)
|
||||
: base(message, innerException)
|
||||
{
|
||||
SqlErrorNumber = errorNumber;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signals a permanent database failure that must not be retried — the SQL-path
|
||||
/// parallel of <see cref="PermanentExternalSystemException"/>. Returned
|
||||
/// synchronously to the calling script on the immediate attempt and parks the
|
||||
/// message on the store-and-forward retry path.
|
||||
/// </summary>
|
||||
public class PermanentDatabaseException : Exception
|
||||
{
|
||||
/// <summary>Gets the SQL Server error number that caused the failure, if known.</summary>
|
||||
public int? SqlErrorNumber { get; }
|
||||
|
||||
/// <summary>Initializes a new <see cref="PermanentDatabaseException"/>.</summary>
|
||||
/// <param name="message">The error message.</param>
|
||||
/// <param name="errorNumber">The SQL Server error number, if available.</param>
|
||||
/// <param name="innerException">Optional inner exception (typically the original <see cref="SqlException"/>).</param>
|
||||
public PermanentDatabaseException(string message, int? errorNumber = null, Exception? innerException = null)
|
||||
: base(message, innerException)
|
||||
{
|
||||
SqlErrorNumber = errorNumber;
|
||||
}
|
||||
}
|
||||
@@ -1326,9 +1326,20 @@ public class ScriptRuntimeContext
|
||||
name, trackedId, target, occurredAtUtc, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
// M2.3 (#7): the gateway now attempts the write immediately and
|
||||
// classifies the outcome (mirroring ExternalSystem.CachedCall). The
|
||||
// result is retained because the immediate paths (WasBuffered=false —
|
||||
// immediate success OR a synchronous permanent failure) bypass the
|
||||
// S&F retry loop entirely, so no retry-loop telemetry ever fires.
|
||||
// This helper must emit the Attempted + CachedResolve terminal rows
|
||||
// itself, otherwise Tracking.Status(id) would stay Submitted forever
|
||||
// and the audit log would be missing the terminal lifecycle. The
|
||||
// WasBuffered=true path is unaffected — the S&F retry loop owns the
|
||||
// Attempted + Resolve emissions there.
|
||||
ExternalCallResult? result;
|
||||
try
|
||||
{
|
||||
await _gateway.CachedWriteAsync(
|
||||
result = await _gateway.CachedWriteAsync(
|
||||
name, sql, parameters, _instanceName, cancellationToken, trackedId,
|
||||
// Audit Log #23 (ExecutionId Task 4): thread the script
|
||||
// execution's ExecutionId + SourceScript so a buffered
|
||||
@@ -1350,9 +1361,148 @@ public class ScriptRuntimeContext
|
||||
throw;
|
||||
}
|
||||
|
||||
// M2.3 (#7): immediate-completion lifecycle — emit the missing
|
||||
// Attempted + CachedResolve rows when the underlying write resolved
|
||||
// without engaging the store-and-forward retry loop (immediate
|
||||
// success or a synchronous permanent failure).
|
||||
if (result is { WasBuffered: false })
|
||||
{
|
||||
await EmitImmediateDbTerminalTelemetryAsync(
|
||||
name, target, trackedId, result, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return trackedId;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// M2.3 (#7): best-effort emission of the immediate-completion lifecycle
|
||||
/// for a <c>Database.CachedWrite</c> that resolved without the S&F
|
||||
/// retry loop — emits an <c>Attempted</c> row then a terminal
|
||||
/// <c>CachedResolve</c> row (<c>Delivered</c> on success, <c>Failed</c> on
|
||||
/// a synchronous permanent SQL error). The DB parallel of
|
||||
/// <see cref="EmitImmediateTerminalTelemetryAsync"/>. Any forwarder
|
||||
/// failure is logged and swallowed (alog.md §7).
|
||||
/// </summary>
|
||||
private async Task EmitImmediateDbTerminalTelemetryAsync(
|
||||
string connectionName,
|
||||
string target,
|
||||
TrackedOperationId trackedId,
|
||||
ExternalCallResult result,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_cachedForwarder == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var occurredAtUtc = DateTime.UtcNow;
|
||||
|
||||
// Status mapping mirrors the API path: success -> Delivered, a
|
||||
// synchronous permanent failure -> Failed. A transient failure never
|
||||
// reaches here (WasBuffered=true), so "the immediate attempt failed
|
||||
// and the operation is done" always means a permanent failure.
|
||||
var auditTerminalStatus = result.Success ? AuditStatus.Delivered : AuditStatus.Failed;
|
||||
var operationalTerminalStatus = result.Success ? "Delivered" : "Failed";
|
||||
|
||||
// --- Attempted row -------------------------------------------------
|
||||
CachedCallTelemetry? attempted = TryBuildDbTerminalTelemetry(
|
||||
connectionName, target, trackedId, occurredAtUtc,
|
||||
AuditKind.DbWriteCached, AuditStatus.Attempted, "Attempted",
|
||||
result, isTerminal: false);
|
||||
|
||||
if (attempted is not null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _cachedForwarder.ForwardAsync(attempted, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Immediate-Attempted telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
|
||||
connectionName, trackedId);
|
||||
}
|
||||
}
|
||||
|
||||
// --- CachedResolve row --------------------------------------------
|
||||
CachedCallTelemetry? resolve = TryBuildDbTerminalTelemetry(
|
||||
connectionName, target, trackedId, occurredAtUtc,
|
||||
AuditKind.CachedResolve, auditTerminalStatus, operationalTerminalStatus,
|
||||
result, isTerminal: true);
|
||||
|
||||
if (resolve is not null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _cachedForwarder.ForwardAsync(resolve, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Immediate-CachedResolve telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
|
||||
connectionName, trackedId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds one immediate-completion <c>DbOutbound</c> telemetry packet, or
|
||||
/// returns <c>null</c> (and logs) when construction throws — so a build
|
||||
/// failure skips emission rather than aborting the script.
|
||||
/// </summary>
|
||||
private CachedCallTelemetry? TryBuildDbTerminalTelemetry(
|
||||
string connectionName,
|
||||
string target,
|
||||
TrackedOperationId trackedId,
|
||||
DateTime occurredAtUtc,
|
||||
AuditKind kind,
|
||||
AuditStatus auditStatus,
|
||||
string operationalStatus,
|
||||
ExternalCallResult result,
|
||||
bool isTerminal)
|
||||
{
|
||||
try
|
||||
{
|
||||
return new CachedCallTelemetry(
|
||||
Audit: ScadaBridgeAuditEventFactory.Create(
|
||||
channel: AuditChannel.DbOutbound,
|
||||
kind: kind,
|
||||
status: auditStatus,
|
||||
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
||||
target: target,
|
||||
correlationId: trackedId.Value,
|
||||
executionId: _executionId,
|
||||
parentExecutionId: _parentExecutionId,
|
||||
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
||||
sourceInstanceId: _instanceName,
|
||||
sourceScript: _sourceScript,
|
||||
errorMessage: result.Success ? null : result.ErrorMessage),
|
||||
Operational: new SiteCallOperational(
|
||||
TrackedOperationId: trackedId,
|
||||
Channel: "DbOutbound",
|
||||
Target: target,
|
||||
SourceSite: _siteId,
|
||||
SourceNode: _sourceNode,
|
||||
Status: operationalStatus,
|
||||
RetryCount: 0,
|
||||
LastError: result.Success ? null : result.ErrorMessage,
|
||||
HttpStatus: null,
|
||||
CreatedAtUtc: occurredAtUtc,
|
||||
UpdatedAtUtc: occurredAtUtc,
|
||||
TerminalAtUtc: isTerminal ? occurredAtUtc : null));
|
||||
}
|
||||
catch (Exception buildEx)
|
||||
{
|
||||
_logger.LogWarning(buildEx,
|
||||
"Failed to build immediate-{Kind} telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission",
|
||||
kind, connectionName, trackedId);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EmitCachedDbSubmitTelemetryAsync(
|
||||
string connectionName,
|
||||
TrackedOperationId trackedId,
|
||||
|
||||
@@ -100,7 +100,14 @@ public class DatabaseGatewayTests
|
||||
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
|
||||
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
|
||||
|
||||
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance, storeAndForward: sf);
|
||||
// M2.3 (#7): CachedWriteAsync now attempts the write immediately and
|
||||
// only buffers on a TRANSIENT failure. The stub forces a transient
|
||||
// outcome so this test exercises the buffering path deterministically
|
||||
// without a real SQL Server.
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
sf,
|
||||
onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205));
|
||||
|
||||
// Audit Log #23 (ExecutionId Task 4): a known execution id / source
|
||||
// script so the gateway -> EnqueueAsync hop can be asserted below.
|
||||
@@ -157,7 +164,11 @@ public class DatabaseGatewayTests
|
||||
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
|
||||
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
|
||||
|
||||
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance, storeAndForward: sf);
|
||||
// M2.3 (#7): force a transient outcome so the write reaches S&F.
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
sf,
|
||||
onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205));
|
||||
|
||||
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
@@ -167,6 +178,219 @@ public class DatabaseGatewayTests
|
||||
Assert.NotEqual(0, maxRetries);
|
||||
}
|
||||
|
||||
// ── M2.3 (#7): transient-vs-permanent SQL classification on the immediate
|
||||
// cached-write attempt + the buffered retry path ──
|
||||
|
||||
/// <summary>
|
||||
/// Builds a real, initialised in-memory store-and-forward service plus a
|
||||
/// keep-alive connection (the SQLite shared-cache DB lives only while a
|
||||
/// connection is open). The caller disposes <paramref name="keepAlive"/>.
|
||||
/// </summary>
|
||||
private static (ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService Sf, string ConnStr, Microsoft.Data.Sqlite.SqliteConnection KeepAlive)
|
||||
NewStoreAndForward()
|
||||
{
|
||||
var dbName = $"EsgCachedWriteClassify_{Guid.NewGuid():N}";
|
||||
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||
var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
|
||||
keepAlive.Open();
|
||||
var storage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage(
|
||||
connStr, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage>.Instance);
|
||||
storage.InitializeAsync().GetAwaiter().GetResult();
|
||||
var sfOptions = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardOptions
|
||||
{
|
||||
DefaultMaxRetries = 99,
|
||||
DefaultRetryInterval = TimeSpan.FromMinutes(10),
|
||||
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
||||
};
|
||||
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
|
||||
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
|
||||
return (sf, connStr, keepAlive);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_PermanentSqlError_ReturnsFailedSynchronously_NotBuffered()
|
||||
{
|
||||
// A constraint/syntax/permission failure on the IMMEDIATE attempt must
|
||||
// be returned to the script as Failed and must NOT be buffered — mirrors
|
||||
// ExternalSystemClient.CachedCallAsync's PermanentExternalSystemException
|
||||
// path.
|
||||
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
|
||||
StubConnection(conn);
|
||||
|
||||
var (sf, connStr, keepAlive) = NewStoreAndForward();
|
||||
using var _ = keepAlive;
|
||||
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
sf,
|
||||
onExecute: () => throw new PermanentDatabaseException(
|
||||
"Violation of PRIMARY KEY constraint", errorNumber: 2627));
|
||||
|
||||
var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.False(result.Success);
|
||||
Assert.False(result.WasBuffered);
|
||||
Assert.NotNull(result.ErrorMessage);
|
||||
|
||||
// Nothing buffered — the permanent failure short-circuited S&F.
|
||||
Assert.Equal(0, ReadBufferDepth(connStr));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_TransientSqlError_BuffersToStoreAndForward()
|
||||
{
|
||||
// A deadlock / timeout on the IMMEDIATE attempt is transient — the write
|
||||
// is handed to S&F (WasBuffered=true), not returned as Failed.
|
||||
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test")
|
||||
{
|
||||
Id = 1,
|
||||
MaxRetries = 5,
|
||||
RetryDelay = TimeSpan.FromSeconds(12),
|
||||
};
|
||||
StubConnection(conn);
|
||||
|
||||
var (sf, connStr, keepAlive) = NewStoreAndForward();
|
||||
using var _ = keepAlive;
|
||||
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
sf,
|
||||
onExecute: () => throw new TransientDatabaseException(
|
||||
"Transaction was deadlocked", errorNumber: 1205));
|
||||
|
||||
var result = await gateway.CachedWriteAsync(
|
||||
"testDb", "UPDATE t SET v = 1", new Dictionary<string, object?> { ["x"] = 1 });
|
||||
|
||||
Assert.True(result.Success); // accepted for delivery
|
||||
Assert.True(result.WasBuffered); // handed to S&F, not synchronously failed
|
||||
Assert.Null(result.ErrorMessage);
|
||||
|
||||
Assert.Equal(1, ReadBufferDepth(connStr));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ImmediateSuccess_NotBuffered_ReturnsDelivered()
|
||||
{
|
||||
// A write that succeeds immediately is done — it must NOT be buffered,
|
||||
// and the result reports success (WasBuffered=false), mirroring the API
|
||||
// path's immediate-success behaviour.
|
||||
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
|
||||
StubConnection(conn);
|
||||
|
||||
var (sf, connStr, keepAlive) = NewStoreAndForward();
|
||||
using var _ = keepAlive;
|
||||
|
||||
var gateway = new ExecuteStubGateway(_repository, sf, onExecute: () => { /* succeeds */ });
|
||||
|
||||
var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.True(result.Success);
|
||||
Assert.False(result.WasBuffered);
|
||||
Assert.Null(result.ErrorMessage);
|
||||
|
||||
Assert.Equal(0, ReadBufferDepth(connStr));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeliverBuffered_TransientSqlError_RethrowsSoEngineRetries()
|
||||
{
|
||||
// On the retry path a transient failure must propagate so the S&F engine
|
||||
// schedules another retry — mirrors ExternalSystemClient.DeliverBuffered
|
||||
// letting TransientExternalSystemException escape.
|
||||
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
|
||||
StubConnection(conn);
|
||||
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
storeAndForward: null,
|
||||
onExecute: () => throw new TransientDatabaseException("timeout", errorNumber: -2));
|
||||
|
||||
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
|
||||
Target = "testDb",
|
||||
PayloadJson =
|
||||
"""{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
|
||||
};
|
||||
|
||||
await Assert.ThrowsAsync<TransientDatabaseException>(
|
||||
() => gateway.DeliverBufferedAsync(message));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeliverBuffered_PermanentSqlError_ReturnsFalseSoMessageParks()
|
||||
{
|
||||
// On the retry path a permanent failure must park the message (return
|
||||
// false) rather than retry forever — mirrors ExternalSystemClient.
|
||||
// DeliverBuffered returning false on PermanentExternalSystemException.
|
||||
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
|
||||
StubConnection(conn);
|
||||
|
||||
var gateway = new ExecuteStubGateway(
|
||||
_repository,
|
||||
storeAndForward: null,
|
||||
onExecute: () => throw new PermanentDatabaseException(
|
||||
"Invalid column name", errorNumber: 207));
|
||||
|
||||
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
|
||||
Target = "testDb",
|
||||
PayloadJson =
|
||||
"""{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
|
||||
};
|
||||
|
||||
var delivered = await gateway.DeliverBufferedAsync(message);
|
||||
|
||||
Assert.False(delivered); // permanent — the S&F engine parks the message
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads the current buffered-message count off the S&F SQLite DB by
|
||||
/// counting <c>sf_messages</c> rows (the engine's persistence table).
|
||||
/// </summary>
|
||||
private static int ReadBufferDepth(string connStr)
|
||||
{
|
||||
using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM sf_messages";
|
||||
return Convert.ToInt32(cmd.ExecuteScalar());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test gateway that substitutes the SQL-execution seam so a test can drive
|
||||
/// success / transient / permanent outcomes without a real SQL Server (and
|
||||
/// without fabricating a <see cref="Microsoft.Data.SqlClient.SqlException"/>,
|
||||
/// which has no public constructor). Production classifies a real
|
||||
/// <c>SqlException</c> into <see cref="TransientDatabaseException"/> /
|
||||
/// <see cref="PermanentDatabaseException"/> at this same seam.
|
||||
/// </summary>
|
||||
private sealed class ExecuteStubGateway : DatabaseGateway
|
||||
{
|
||||
private readonly Action _onExecute;
|
||||
|
||||
public ExecuteStubGateway(
|
||||
IExternalSystemRepository repository,
|
||||
ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService? storeAndForward,
|
||||
Action onExecute)
|
||||
: base(repository, NullLogger<DatabaseGateway>.Instance, storeAndForward)
|
||||
=> _onExecute = onExecute;
|
||||
|
||||
internal override Task ExecuteWriteAsync(
|
||||
string connectionName,
|
||||
string connectionString,
|
||||
string sql,
|
||||
IReadOnlyDictionary<string, object?> parameters,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
_onExecute();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript)
|
||||
ReadBufferedRetrySettings(string connStr)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// M2.3 (#7): unit tests for the transient-vs-permanent SQL error-number
|
||||
/// classifier that <c>DatabaseGateway</c> uses to decide whether a failed
|
||||
/// cached write should be buffered (transient) or returned to the script
|
||||
/// synchronously / parked (permanent).
|
||||
/// </summary>
|
||||
public class SqlErrorClassifierTests
|
||||
{
|
||||
// The full transient set documented on SqlErrorClassifier — connection,
|
||||
// timeout, deadlock, and Azure throttle error numbers. A retry can plausibly
|
||||
// succeed for any of these, so they are buffered to store-and-forward.
|
||||
[Theory]
|
||||
[InlineData(-2)] // timeout expired
|
||||
[InlineData(-1)] // connection error
|
||||
[InlineData(2)] // network / instance not found
|
||||
[InlineData(53)] // network path not found
|
||||
[InlineData(64)] // connection terminated mid-session
|
||||
[InlineData(233)] // no process on the other end of the pipe
|
||||
[InlineData(1205)] // deadlock victim
|
||||
[InlineData(10053)] // transport-level abort
|
||||
[InlineData(10054)] // connection reset by peer
|
||||
[InlineData(10060)] // connection timed out
|
||||
[InlineData(40197)] // Azure SQL service error, retry
|
||||
[InlineData(40501)] // Azure SQL service busy
|
||||
[InlineData(40613)] // Azure SQL database unavailable
|
||||
[InlineData(49918)] // Azure SQL cannot process request (throttle)
|
||||
[InlineData(49919)] // Azure SQL too many create/update operations
|
||||
[InlineData(49920)] // Azure SQL too many operations (throttle)
|
||||
public void IsTransient_KnownTransientNumber_ReturnsTrue(int errorNumber)
|
||||
{
|
||||
Assert.True(SqlErrorClassifier.IsTransient(errorNumber));
|
||||
}
|
||||
|
||||
// Constraint, syntax, and permission errors are permanent — retrying the
|
||||
// identical statement cannot succeed and may cause duplicate side effects.
|
||||
[Theory]
|
||||
[InlineData(547)] // constraint violation (FK/CHECK)
|
||||
[InlineData(2627)] // primary-key / unique constraint violation
|
||||
[InlineData(2601)] // duplicate key in a unique index
|
||||
[InlineData(102)] // incorrect syntax
|
||||
[InlineData(156)] // incorrect syntax near a keyword
|
||||
[InlineData(207)] // invalid column name
|
||||
[InlineData(208)] // invalid object name
|
||||
[InlineData(229)] // permission denied on object
|
||||
[InlineData(230)] // permission denied on column
|
||||
[InlineData(262)] // permission denied (CREATE etc.)
|
||||
public void IsTransient_KnownPermanentNumber_ReturnsFalse(int errorNumber)
|
||||
{
|
||||
Assert.False(SqlErrorClassifier.IsTransient(errorNumber));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(0)] // no error number captured
|
||||
[InlineData(99999)] // unknown / undocumented number
|
||||
[InlineData(12345)]
|
||||
[InlineData(int.MaxValue)]
|
||||
public void IsTransient_UnknownNumber_DefaultsToPermanent(int errorNumber)
|
||||
{
|
||||
// Fail-fast is the safer default: an unrecognised error number must NOT
|
||||
// be silently retried forever. Unknown => permanent => false.
|
||||
Assert.False(SqlErrorClassifier.IsTransient(errorNumber));
|
||||
}
|
||||
}
|
||||
+153
-10
@@ -77,7 +77,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -118,7 +123,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -147,7 +157,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId);
|
||||
@@ -169,7 +184,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -207,7 +227,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -248,7 +273,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId);
|
||||
@@ -281,7 +311,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -310,7 +345,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder
|
||||
{
|
||||
ThrowOnForward = new InvalidOperationException("simulated forwarder outage"),
|
||||
@@ -348,7 +388,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = new ScriptRuntimeContext.DatabaseHelper(
|
||||
@@ -384,7 +429,12 @@ public class DatabaseCachedWriteEmissionTests
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
// M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The
|
||||
// buffered result (WasBuffered=true) models the transient-failure
|
||||
// path these enqueue-telemetry tests exercise — only the CachedSubmit
|
||||
// packet is emitted; the S&F retry loop (not the helper) owns the
|
||||
// terminal rows, so Assert.Single(forwarder.Telemetry) still holds.
|
||||
.ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
@@ -393,4 +443,97 @@ public class DatabaseCachedWriteEmissionTests
|
||||
var packet = Assert.Single(forwarder.Telemetry);
|
||||
Assert.Null(packet.Operational.SourceNode);
|
||||
}
|
||||
|
||||
// ── M2.3 (#7): immediate-completion lifecycle (WasBuffered=false) ──
|
||||
|
||||
private static Mock<IDatabaseGateway> GatewayReturning(ExternalCallResult result)
|
||||
{
|
||||
var gateway = new Mock<IDatabaseGateway>();
|
||||
gateway
|
||||
.Setup(g => g.CachedWriteAsync(
|
||||
It.IsAny<string>(), It.IsAny<string>(),
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
It.IsAny<string?>(),
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>(),
|
||||
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
|
||||
.ReturnsAsync(result);
|
||||
return gateway;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ImmediateSuccess_EmitsSubmitAttemptedThenDeliveredResolve()
|
||||
{
|
||||
// An immediate success (WasBuffered=false) bypasses the S&F retry loop,
|
||||
// so the helper itself must emit the Attempted + terminal CachedResolve
|
||||
// rows — mirroring ExternalSystem.CachedCall's immediate-success path.
|
||||
var gateway = GatewayReturning(
|
||||
new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: false));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.Equal(3, forwarder.Telemetry.Count);
|
||||
|
||||
var submit = forwarder.Telemetry[0].Audit.AsRow();
|
||||
Assert.Equal(AuditKind.CachedSubmit, submit.Kind);
|
||||
Assert.Equal(AuditStatus.Submitted, submit.Status);
|
||||
|
||||
var attempted = forwarder.Telemetry[1].Audit.AsRow();
|
||||
Assert.Equal(AuditChannel.DbOutbound, attempted.Channel);
|
||||
Assert.Equal(AuditKind.DbWriteCached, attempted.Kind);
|
||||
Assert.Equal(AuditStatus.Attempted, attempted.Status);
|
||||
|
||||
var resolve = forwarder.Telemetry[2];
|
||||
Assert.Equal(AuditChannel.DbOutbound, resolve.Audit.AsRow().Channel);
|
||||
Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind);
|
||||
Assert.Equal(AuditStatus.Delivered, resolve.Audit.AsRow().Status);
|
||||
Assert.Equal(trackedId.Value, resolve.Audit.AsRow().CorrelationId);
|
||||
Assert.Equal("Delivered", resolve.Operational.Status);
|
||||
Assert.NotNull(resolve.Operational.TerminalAtUtc);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ImmediatePermanentFailure_EmitsSubmitAttemptedThenFailedResolve()
|
||||
{
|
||||
// A synchronous permanent SQL failure (Success=false, WasBuffered=false)
|
||||
// also bypasses S&F; the terminal CachedResolve must be Failed and the
|
||||
// error message must propagate onto the row.
|
||||
const string error = "Permanent database error: Permanent SQL error 2627 on myDb: ...";
|
||||
var gateway = GatewayReturning(
|
||||
new ExternalCallResult(Success: false, ResponseJson: null, ErrorMessage: error, WasBuffered: false));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.Equal(3, forwarder.Telemetry.Count);
|
||||
|
||||
var resolve = forwarder.Telemetry[2];
|
||||
Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind);
|
||||
Assert.Equal(AuditStatus.Failed, resolve.Audit.AsRow().Status);
|
||||
Assert.Equal(error, resolve.Audit.AsRow().ErrorMessage);
|
||||
Assert.Equal("Failed", resolve.Operational.Status);
|
||||
Assert.Equal(error, resolve.Operational.LastError);
|
||||
Assert.NotNull(resolve.Operational.TerminalAtUtc);
|
||||
Assert.NotEqual(default, trackedId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_BufferedTransient_EmitsOnlySubmit_NoTerminalRows()
|
||||
{
|
||||
// The WasBuffered=true path must NOT emit Attempted / CachedResolve — the
|
||||
// S&F retry loop owns those. Only the CachedSubmit row is emitted by the
|
||||
// helper.
|
||||
var gateway = GatewayReturning(
|
||||
new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true));
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
var packet = Assert.Single(forwarder.Telemetry);
|
||||
Assert.Equal(AuditKind.CachedSubmit, packet.Audit.AsRow().Kind);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user