Files
ScadaBridge/tests/ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs
T
Joseph Doherty de375ff7ea fix(db): classify non-SqlException DB outages as transient; propagate cancellation (#7)
ExecuteWriteAsync only caught SqlException, so a live outage surfacing as
InvalidOperationException/SocketException/IOException/TimeoutException escaped
unclassified and crashed the script actor instead of buffering. Mirror the HTTP
path: propagate OperationCanceledException on cancellation, classify transport
exceptions as transient (buffer+retry), let unexpected exceptions propagate.
2026-06-15 14:03:25 -04:00

740 lines
32 KiB
C#

using System.Data;
using System.Data.Common;
using System.Text.Json;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.ExternalSystems;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
namespace ZB.MOM.WW.ScadaBridge.ExternalSystemGateway.Tests;
/// <summary>
/// WP-9: Tests for Database access — connection resolution, cached writes.
/// </summary>
public class DatabaseGatewayTests
{
private readonly IExternalSystemRepository _repository = Substitute.For<IExternalSystemRepository>();
/// <summary>
/// Configures the repository substitute for the name-keyed connection-resolution
/// path used by <c>DatabaseGateway</c> (ExternalSystemGateway-011). A <c>null</c>
/// connection models a "not found" — the substitute returns <c>null</c> by default,
/// so no stub is needed for the absent entity.
/// </summary>
private void StubConnection(DatabaseConnectionDefinition? connection)
{
if (connection != null)
{
_repository.GetDatabaseConnectionByNameAsync(connection.Name, Arg.Any<CancellationToken>())
.Returns(connection);
}
}
[Fact]
public async Task GetConnection_NotFound_Throws()
{
StubConnection(connection: null);
var gateway = new DatabaseGateway(
_repository,
NullLogger<DatabaseGateway>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(
() => gateway.GetConnectionAsync("nonexistent"));
}
[Fact]
public async Task CachedWrite_NoStoreAndForward_Throws()
{
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var gateway = new DatabaseGateway(
_repository,
NullLogger<DatabaseGateway>.Instance,
storeAndForward: null);
await Assert.ThrowsAsync<InvalidOperationException>(
() => gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"));
}
[Fact]
public async Task CachedWrite_ConnectionNotFound_Throws()
{
StubConnection(connection: null);
var gateway = new DatabaseGateway(
_repository,
NullLogger<DatabaseGateway>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(
() => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)"));
}
// ── ExternalSystemGateway-014: CachedWrite happy-path buffering ──
[Fact]
public async Task CachedWrite_BuffersTheWriteWithConnectionRetrySettings()
{
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test")
{
Id = 1,
MaxRetries = 5,
RetryDelay = TimeSpan.FromSeconds(12),
};
StubConnection(conn);
var dbName = $"EsgCachedWrite_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
using var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
keepAlive.Open();
var storage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage(
connStr, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage>.Instance);
await storage.InitializeAsync();
var sfOptions = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardOptions
{
DefaultMaxRetries = 99,
DefaultRetryInterval = TimeSpan.FromMinutes(10),
RetryTimerInterval = TimeSpan.FromMinutes(10),
};
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
// M2.3 (#7): CachedWriteAsync now attempts the write immediately and
// only buffers on a TRANSIENT failure. The stub forces a transient
// outcome so this test exercises the buffering path deterministically
// without a real SQL Server.
var gateway = new ExecuteStubGateway(
_repository,
sf,
onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205));
// Audit Log #23 (ExecutionId Task 4): a known execution id / source
// script so the gateway -> EnqueueAsync hop can be asserted below.
var executionId = Guid.NewGuid();
const string sourceScript = "ScriptActor:WriteAudit";
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (@v)",
new Dictionary<string, object?> { ["v"] = 1 },
executionId: executionId, sourceScript: sourceScript);
var depth = await storage.GetBufferDepthByCategoryAsync();
Assert.Equal(1, depth[ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite]);
var buffered = ReadBufferedRetrySettings(connStr);
Assert.Equal(5, buffered.MaxRetries);
Assert.Equal((long)TimeSpan.FromSeconds(12).TotalMilliseconds, buffered.RetryIntervalMs);
// ExecutionId Task 4: the gateway must forward executionId / sourceScript
// into EnqueueAsync, and the S&F layer must persist them on the
// sf_messages row so the retry loop can stamp the right provenance.
Assert.Equal(executionId, buffered.ExecutionId);
Assert.Equal(sourceScript, buffered.SourceScript);
}
[Fact]
public async Task CachedWrite_ZeroMaxRetriesIsTreatedAsUnsetNotRetryForever()
{
// ExternalSystemGateway-015: a stored MaxRetries of 0 is interpreted by the
// Store-and-Forward retry sweep as "no limit" (retry forever). The entity's
// non-nullable int default is also 0, so the gateway must treat the
// connection's MaxRetries == 0 as "unset" and pass null — the bounded S&F
// default must apply, never 0.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test")
{
Id = 1,
MaxRetries = 0,
RetryDelay = TimeSpan.FromSeconds(3),
};
StubConnection(conn);
var dbName = $"EsgCachedWriteZero_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
using var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
keepAlive.Open();
var storage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage(
connStr, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage>.Instance);
await storage.InitializeAsync();
var sfOptions = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardOptions
{
DefaultMaxRetries = 99,
DefaultRetryInterval = TimeSpan.FromMinutes(10),
RetryTimerInterval = TimeSpan.FromMinutes(10),
};
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
// M2.3 (#7): force a transient outcome so the write reaches S&F.
var gateway = new ExecuteStubGateway(
_repository,
sf,
onExecute: () => throw new TransientDatabaseException("deadlock", errorNumber: 1205));
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
var (maxRetries, _, _, _) = ReadBufferedRetrySettings(connStr);
// Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever.
Assert.Equal(99, maxRetries);
Assert.NotEqual(0, maxRetries);
}
// ── M2.3 (#7): transient-vs-permanent SQL classification on the immediate
// cached-write attempt + the buffered retry path ──
/// <summary>
/// Builds a real, initialised in-memory store-and-forward service plus a
/// keep-alive connection (the SQLite shared-cache DB lives only while a
/// connection is open). The caller disposes <paramref name="keepAlive"/>.
/// </summary>
private static (ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService Sf, string ConnStr, Microsoft.Data.Sqlite.SqliteConnection KeepAlive)
NewStoreAndForward()
{
var dbName = $"EsgCachedWriteClassify_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
keepAlive.Open();
var storage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage(
connStr, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardStorage>.Instance);
storage.InitializeAsync().GetAwaiter().GetResult();
var sfOptions = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardOptions
{
DefaultMaxRetries = 99,
DefaultRetryInterval = TimeSpan.FromMinutes(10),
RetryTimerInterval = TimeSpan.FromMinutes(10),
};
var sf = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService(
storage, sfOptions, NullLogger<ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService>.Instance);
return (sf, connStr, keepAlive);
}
[Fact]
public async Task CachedWrite_PermanentSqlError_ReturnsFailedSynchronously_NotBuffered()
{
// A constraint/syntax/permission failure on the IMMEDIATE attempt must
// be returned to the script as Failed and must NOT be buffered — mirrors
// ExternalSystemClient.CachedCallAsync's PermanentExternalSystemException
// path.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
var gateway = new ExecuteStubGateway(
_repository,
sf,
onExecute: () => throw new PermanentDatabaseException(
"Violation of PRIMARY KEY constraint", errorNumber: 2627));
var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
Assert.False(result.Success);
Assert.False(result.WasBuffered);
Assert.NotNull(result.ErrorMessage);
// Nothing buffered — the permanent failure short-circuited S&F.
Assert.Equal(0, ReadBufferDepth(connStr));
}
[Fact]
public async Task CachedWrite_TransientSqlError_BuffersToStoreAndForward()
{
// A deadlock / timeout on the IMMEDIATE attempt is transient — the write
// is handed to S&F (WasBuffered=true), not returned as Failed.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test")
{
Id = 1,
MaxRetries = 5,
RetryDelay = TimeSpan.FromSeconds(12),
};
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
var gateway = new ExecuteStubGateway(
_repository,
sf,
onExecute: () => throw new TransientDatabaseException(
"Transaction was deadlocked", errorNumber: 1205));
var result = await gateway.CachedWriteAsync(
"testDb", "UPDATE t SET v = 1", new Dictionary<string, object?> { ["x"] = 1 });
Assert.True(result.Success); // accepted for delivery
Assert.True(result.WasBuffered); // handed to S&F, not synchronously failed
Assert.Null(result.ErrorMessage);
Assert.Equal(1, ReadBufferDepth(connStr));
}
[Fact]
public async Task CachedWrite_ImmediateSuccess_NotBuffered_ReturnsDelivered()
{
// A write that succeeds immediately is done — it must NOT be buffered,
// and the result reports success (WasBuffered=false), mirroring the API
// path's immediate-success behaviour.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
var gateway = new ExecuteStubGateway(_repository, sf, onExecute: () => { /* succeeds */ });
var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
Assert.True(result.Success);
Assert.False(result.WasBuffered);
Assert.Null(result.ErrorMessage);
Assert.Equal(0, ReadBufferDepth(connStr));
}
[Fact]
public async Task DeliverBuffered_TransientSqlError_RethrowsSoEngineRetries()
{
// On the retry path a transient failure must propagate so the S&F engine
// schedules another retry — mirrors ExternalSystemClient.DeliverBuffered
// letting TransientExternalSystemException escape.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var gateway = new ExecuteStubGateway(
_repository,
storeAndForward: null,
onExecute: () => throw new TransientDatabaseException("timeout", errorNumber: -2));
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "testDb",
PayloadJson =
"""{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
};
await Assert.ThrowsAsync<TransientDatabaseException>(
() => gateway.DeliverBufferedAsync(message));
}
[Fact]
public async Task DeliverBuffered_PermanentSqlError_ReturnsFalseSoMessageParks()
{
// On the retry path a permanent failure must park the message (return
// false) rather than retry forever — mirrors ExternalSystemClient.
// DeliverBuffered returning false on PermanentExternalSystemException.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var gateway = new ExecuteStubGateway(
_repository,
storeAndForward: null,
onExecute: () => throw new PermanentDatabaseException(
"Invalid column name", errorNumber: 207));
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "testDb",
PayloadJson =
"""{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
};
var delivered = await gateway.DeliverBufferedAsync(message);
Assert.False(delivered); // permanent — the S&F engine parks the message
}
// ── M2.3 (#7) code-review fix: ExecuteWriteAsync must classify NON-SqlException
// DB outages as transient (buffer+retry) and propagate cancellation —
// mirroring the HTTP path's ordered catches in InvokeHttpAsync. The pre-fix
// code only caught SqlException, so a live outage surfacing as
// InvalidOperationException / SocketException / IOException / TimeoutException
// escaped unclassified and crashed the Script Execution Actor instead of
// buffering. These tests drive the RAW execution seam (RunSqlAsync) so the
// PRODUCTION classification in ExecuteWriteAsync runs end-to-end. ──
public static IEnumerable<object[]> TransientNonSqlOutages()
{
// A live DB outage that surfaces as a non-SqlException: connection-state,
// socket, IO, and timeout failures are all retryable transport errors.
yield return new object[] { new InvalidOperationException("The connection is not open.") };
yield return new object[] { new System.Net.Sockets.SocketException(10061 /* connection refused */) };
yield return new object[] { new System.IO.IOException("Unable to read data from the transport connection.") };
yield return new object[] { new TimeoutException("The operation has timed out.") };
}
[Theory]
[MemberData(nameof(TransientNonSqlOutages))]
public async Task CachedWrite_NonSqlOutage_ClassifiedTransient_BuffersNotCrash(Exception outage)
{
// [1] A live outage that is NOT a SqlException must be classified TRANSIENT
// (buffered for retry), NOT escape unclassified to crash the script actor,
// and NOT be returned as a permanent Failed result.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test")
{
Id = 1,
MaxRetries = 5,
RetryDelay = TimeSpan.FromSeconds(12),
};
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
// RawExecuteStubGateway routes the raw throw through the PRODUCTION
// ExecuteWriteAsync classification (the seam under test), unlike
// ExecuteStubGateway which throws an already-classified exception.
var gateway = new RawExecuteStubGateway(_repository, sf, onRunSql: () => throw outage);
var result = await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
Assert.True(result.Success); // accepted for delivery, not a crash
Assert.True(result.WasBuffered); // handed to S&F as transient
Assert.Null(result.ErrorMessage); // not a permanent Failed result
Assert.Equal(1, ReadBufferDepth(connStr));
}
[Fact]
public async Task CachedWrite_CancellationRequested_PropagatesOperationCanceled_NotReclassified()
{
// [2] OperationCanceledException raised while the caller's token is
// cancelled must propagate UNCHANGED — never reclassified as a transient
// DB error and never buffered. Mirrors the HTTP path's first catch:
// `catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) throw;`
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
using var cts = new CancellationTokenSource();
cts.Cancel();
var gateway = new RawExecuteStubGateway(
_repository, sf, onRunSql: () => throw new OperationCanceledException(cts.Token));
await Assert.ThrowsAsync<OperationCanceledException>(
() => gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)", cancellationToken: cts.Token));
// Cancellation is not a transient failure — nothing must have been buffered.
Assert.Equal(0, ReadBufferDepth(connStr));
}
[Fact]
public async Task CachedWrite_UnexpectedException_Propagates_NotClassifiedTransient()
{
// An exception type outside the transient transport set (e.g.
// ArgumentException) is NOT a DB outage — it must propagate, exactly as
// the HTTP path lets genuinely-unexpected exceptions escape past
// `catch (Exception ex) when (ErrorClassifier.IsTransient(ex))`.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var (sf, connStr, keepAlive) = NewStoreAndForward();
using var _ = keepAlive;
var gateway = new RawExecuteStubGateway(
_repository, sf, onRunSql: () => throw new ArgumentException("authoring bug"));
await Assert.ThrowsAsync<ArgumentException>(
() => gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"));
Assert.Equal(0, ReadBufferDepth(connStr));
}
[Fact]
public async Task DeliverBuffered_NonSqlOutage_RethrowsAsTransient_SoEngineRetries()
{
// [1] on the RETRY path: a non-SqlException outage during delivery must be
// classified transient and propagate (as TransientDatabaseException) so
// the S&F engine schedules another retry — it must NOT crash/park.
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var gateway = new RawExecuteStubGateway(
_repository,
storeAndForward: null,
onRunSql: () => throw new InvalidOperationException("The connection is not open."));
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "testDb",
PayloadJson =
"""{"ConnectionName":"testDb","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
};
await Assert.ThrowsAsync<TransientDatabaseException>(
() => gateway.DeliverBufferedAsync(message));
}
/// <summary>
/// Reads the current buffered-message count off the S&amp;F SQLite DB by
/// counting <c>sf_messages</c> rows (the engine's persistence table).
/// </summary>
private static int ReadBufferDepth(string connStr)
{
using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT COUNT(*) FROM sf_messages";
return Convert.ToInt32(cmd.ExecuteScalar());
}
/// <summary>
/// Test gateway that substitutes the SQL-execution seam so a test can drive
/// success / transient / permanent outcomes without a real SQL Server (and
/// without fabricating a <see cref="Microsoft.Data.SqlClient.SqlException"/>,
/// which has no public constructor). Production classifies a real
/// <c>SqlException</c> into <see cref="TransientDatabaseException"/> /
/// <see cref="PermanentDatabaseException"/> at this same seam.
/// </summary>
private sealed class ExecuteStubGateway : DatabaseGateway
{
private readonly Action _onExecute;
public ExecuteStubGateway(
IExternalSystemRepository repository,
ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService? storeAndForward,
Action onExecute)
: base(repository, NullLogger<DatabaseGateway>.Instance, storeAndForward)
=> _onExecute = onExecute;
internal override Task ExecuteWriteAsync(
string connectionName,
string connectionString,
string sql,
IReadOnlyDictionary<string, object?> parameters,
CancellationToken cancellationToken)
{
_onExecute();
return Task.CompletedTask;
}
}
/// <summary>
/// Test gateway that substitutes the INNER SQL-execution seam
/// (<c>RunSqlAsync</c>) so a test can throw RAW exceptions (a real outage
/// shape: <see cref="InvalidOperationException"/>, <see cref="System.Net.Sockets.SocketException"/>,
/// etc.) and have them flow through the PRODUCTION
/// <c>ExecuteWriteAsync</c> classification (the catch ordering under test) —
/// unlike <see cref="ExecuteStubGateway"/>, which throws an
/// already-classified <see cref="TransientDatabaseException"/> /
/// <see cref="PermanentDatabaseException"/> and so bypasses the catches.
/// </summary>
private sealed class RawExecuteStubGateway : DatabaseGateway
{
private readonly Action _onRunSql;
public RawExecuteStubGateway(
IExternalSystemRepository repository,
ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardService? storeAndForward,
Action onRunSql)
: base(repository, NullLogger<DatabaseGateway>.Instance, storeAndForward)
=> _onRunSql = onRunSql;
internal override Task RunSqlAsync(
string connectionString,
string sql,
IReadOnlyDictionary<string, object?> parameters,
CancellationToken cancellationToken)
{
_onRunSql();
return Task.CompletedTask;
}
}
private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript)
ReadBufferedRetrySettings(string connStr)
{
using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText =
"SELECT max_retries, retry_interval_ms, execution_id, source_script FROM sf_messages";
using var reader = cmd.ExecuteReader();
Assert.True(reader.Read(), "expected exactly one buffered message");
var result = (
reader.GetInt32(0),
reader.GetInt64(1),
reader.IsDBNull(2) ? (Guid?)null : Guid.Parse(reader.GetString(2)),
reader.IsDBNull(3) ? null : reader.GetString(3));
Assert.False(reader.Read(), "expected exactly one buffered message");
return result;
}
// ── ExternalSystemGateway-001: buffered CachedDbWrite delivery handler ──
[Fact]
public async Task DeliverBuffered_ConnectionNoLongerExists_ReturnsFalseSoMessageParks()
{
StubConnection(connection: null);
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance);
var message = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "gone-db",
PayloadJson =
"""{"ConnectionName":"gone-db","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
};
var delivered = await gateway.DeliverBufferedAsync(message);
Assert.False(delivered); // permanent — the S&F engine parks the message
}
// ── ExternalSystemGateway-018: malformed JSON payload must park, not retry-forever ──
[Fact]
public async Task DeliverBuffered_MalformedJsonPayload_ReturnsFalseSoMessageParks()
{
// No connection stub needed — deserialization fails before any
// resolution or SQL execution. If the JsonException were to escape (the
// pre-018 behaviour) the S&F engine would treat it as transient and
// retry the same poison row forever.
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance);
var poisonMessage = new ZB.MOM.WW.ScadaBridge.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "someDb",
// Truncated mid-write — `{` opens an object that never closes.
PayloadJson = "{\"ConnectionName\":\"someDb\",\"Sql\":\"INSERT",
};
var delivered = await gateway.DeliverBufferedAsync(poisonMessage);
Assert.False(delivered); // permanent — the S&F engine parks the message
}
// ── ExternalSystemGateway-020: decimal SQL parameter precision survives JsonElement round-trip ──
[Fact]
public void JsonElementToParameterValue_DecimalShapedNumber_PreservesPrecisionViaDecimal()
{
// A script's decimal SQL parameter is serialised as a bare JSON number
// (System.Text.Json has no decimal type tag), then on the cached-write
// retry path the buffered payload is re-deserialised into a
// JsonElement and the gateway must materialise a CLR value for the
// parameter. Pre-020 it called GetDouble() for any non-Int64 number,
// which silently downcast every decimal to a binary float and lost
// precision (1234567890.1234567890 -> 1234567890.1234567 as double).
// The 020 fix prefers decimal — round-tripping must preserve every
// digit of an authoring-time decimal value.
const string authoredJson = "1234567890.1234567890";
// Round-trip through JsonElement, mirroring the buffered-payload path.
using var document = JsonDocument.Parse(authoredJson);
var element = document.RootElement;
var materialised = DatabaseGateway.JsonElementToParameterValue(element);
// The materialised value must be the original decimal, not a double.
// Asserting on the type alone is enough to fail pre-020 (which
// produced a System.Double); the value assertion locks in the
// precision invariant.
var asDecimal = Assert.IsType<decimal>(materialised);
Assert.Equal(1234567890.1234567890m, asDecimal);
}
[Fact]
public void JsonElementToParameterValue_WholeNumber_FastPathReturnsLong()
{
// Whole numbers must keep the existing Int64 fast path — the 020 fix
// is "long first, then decimal, then double", not "decimal first".
using var document = JsonDocument.Parse("42");
var element = document.RootElement;
var materialised = DatabaseGateway.JsonElementToParameterValue(element);
Assert.IsType<long>(materialised);
Assert.Equal(42L, materialised);
}
[Fact]
public void JsonElementToParameterValue_OutOfDecimalRangeNumber_FallsThroughToDouble()
{
// A genuinely out-of-decimal-range value (e.g. very large scientific
// notation) must still fall through to double rather than throw — the
// decimal probe is a precision-preserving preference, not a hard
// requirement.
using var document = JsonDocument.Parse("1e40");
var element = document.RootElement;
var materialised = DatabaseGateway.JsonElementToParameterValue(element);
Assert.IsType<double>(materialised);
Assert.Equal(1e40, (double)materialised);
}
// ── ExternalSystemGateway-010: SqlConnection must not leak when OpenAsync fails ──
[Fact]
public async Task GetConnection_OpenFails_DisposesConnectionBeforeRethrowing()
{
var conn = new DatabaseConnectionDefinition("testDb", "Server=localhost;Database=test") { Id = 1 };
StubConnection(conn);
var fake = new ThrowingDbConnection();
var gateway = new ConnectionFactoryStubGateway(_repository, fake);
await Assert.ThrowsAsync<InvalidOperationException>(
() => gateway.GetConnectionAsync("testDb"));
Assert.True(fake.WasDisposed, "The SqlConnection was leaked — it must be disposed when OpenAsync fails");
}
/// <summary>Test gateway that substitutes the connection factory with a stub.</summary>
private sealed class ConnectionFactoryStubGateway : DatabaseGateway
{
private readonly DbConnection _connection;
public ConnectionFactoryStubGateway(IExternalSystemRepository repository, DbConnection connection)
: base(repository, NullLogger<DatabaseGateway>.Instance) => _connection = connection;
internal override DbConnection CreateConnection(string connectionString) => _connection;
}
/// <summary>A DbConnection whose OpenAsync always fails, tracking whether it was disposed.</summary>
private sealed class ThrowingDbConnection : DbConnection
{
public bool WasDisposed { get; private set; }
public override Task OpenAsync(CancellationToken cancellationToken) =>
throw new InvalidOperationException("simulated open failure");
public override void Open() => throw new InvalidOperationException("simulated open failure");
protected override void Dispose(bool disposing)
{
if (disposing) WasDisposed = true;
base.Dispose(disposing);
}
public override ValueTask DisposeAsync()
{
WasDisposed = true;
return base.DisposeAsync();
}
// Unused abstract members.
[System.Diagnostics.CodeAnalysis.AllowNull]
public override string ConnectionString { get; set; } = string.Empty;
public override string Database => string.Empty;
public override string DataSource => string.Empty;
public override string ServerVersion => string.Empty;
public override ConnectionState State => ConnectionState.Closed;
public override void ChangeDatabase(string databaseName) => throw new NotSupportedException();
public override void Close() { }
protected override DbTransaction BeginDbTransaction(IsolationLevel il) => throw new NotSupportedException();
protected override DbCommand CreateDbCommand() => throw new NotSupportedException();
}
}