feat(site): older-write guard for replicated config writes

Add StoreDeployedConfigIfNewerAsync to SiteStorageService — an atomic
conditional upsert using SQLite's ON CONFLICT … WHERE clause so the
standby node only overwrites a deployed_configurations row when the
incoming deployed_at is strictly newer. The active-node path
(StoreDeployedConfigAsync) stays unguarded. Four deterministic tests
cover: no-row insert, older-overwrites, newer-is-noop, equal-is-noop;
all seed rows with explicit DateTimeOffset values via direct SQL to
avoid wall-clock timing dependencies.
This commit is contained in:
Joseph Doherty
2026-06-26 13:29:21 -04:00
parent 7a085444e4
commit caaa855362
2 changed files with 174 additions and 0 deletions
@@ -229,6 +229,71 @@ public class SiteStorageService
_logger.LogDebug("Stored deployed config for {Instance}, deploymentId={DeploymentId}", instanceName, deploymentId); _logger.LogDebug("Stored deployed config for {Instance}, deploymentId={DeploymentId}", instanceName, deploymentId);
} }
/// <summary>
/// Stores a deployed instance configuration only if the incoming write is strictly
/// newer than any already-stored row for the same instance. If no row exists the
/// write is always performed (insert path). Uses SQLite's conditional
/// <c>ON CONFLICT … WHERE excluded.deployed_at &gt; deployed_configurations.deployed_at</c>
/// clause so the guard is atomic with no application-level read-modify-write.
/// </summary>
/// <remarks>
/// This is the standby-node write path for replicated configs. The active-node
/// apply path (<see cref="StoreDeployedConfigAsync"/>) remains unguarded and always
/// overwrites, because the active node's write is always authoritative.
/// <para>
/// <paramref name="deployedAtOverride"/> is exposed for testing so that the exact
/// <c>deployed_at</c> value can be controlled without sleeping between calls.
/// Production callers omit it and receive the default <see cref="DateTimeOffset.UtcNow"/>
/// semantics, identical to <see cref="StoreDeployedConfigAsync"/>.
/// </para>
/// </remarks>
/// <param name="instanceName">The unique name of the instance.</param>
/// <param name="configJson">The deployed configuration as JSON.</param>
/// <param name="deploymentId">The unique deployment identifier.</param>
/// <param name="revisionHash">The configuration revision hash for staleness detection.</param>
/// <param name="isEnabled">Whether the instance is enabled.</param>
/// <param name="deployedAtOverride">
/// Optional explicit <c>deployed_at</c> timestamp. Defaults to
/// <see cref="DateTimeOffset.UtcNow"/> when <see langword="null"/>.
/// </param>
/// <returns>A task that completes when the conditional upsert has been executed.</returns>
public async Task StoreDeployedConfigIfNewerAsync(
string instanceName,
string configJson,
string deploymentId,
string revisionHash,
bool isEnabled,
DateTimeOffset? deployedAtOverride = null)
{
var deployedAt = (deployedAtOverride ?? DateTimeOffset.UtcNow).ToString("O");
await using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
await using var command = connection.CreateCommand();
command.CommandText = @"
INSERT INTO deployed_configurations
(instance_unique_name, config_json, deployment_id, revision_hash, is_enabled, deployed_at)
VALUES (@name, @json, @depId, @hash, @enabled, @deployedAt)
ON CONFLICT(instance_unique_name) DO UPDATE SET
config_json = excluded.config_json,
deployment_id = excluded.deployment_id,
revision_hash = excluded.revision_hash,
is_enabled = excluded.is_enabled,
deployed_at = excluded.deployed_at
WHERE excluded.deployed_at > deployed_configurations.deployed_at";
command.Parameters.AddWithValue("@name", instanceName);
command.Parameters.AddWithValue("@json", configJson);
command.Parameters.AddWithValue("@depId", deploymentId);
command.Parameters.AddWithValue("@hash", revisionHash);
command.Parameters.AddWithValue("@enabled", isEnabled ? 1 : 0);
command.Parameters.AddWithValue("@deployedAt", deployedAt);
await command.ExecuteNonQueryAsync();
_logger.LogDebug("StoreDeployedConfigIfNewer for {Instance}, deploymentId={DeploymentId}", instanceName, deploymentId);
}
/// <summary> /// <summary>
/// Removes a deployed instance configuration and its static overrides. /// Removes a deployed instance configuration and its static overrides.
/// </summary> /// </summary>
@@ -1,3 +1,4 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
@@ -194,4 +195,112 @@ public class SiteStorageServiceTests : IAsyncLifetime, IDisposable
Assert.Empty(configs); Assert.Empty(configs);
Assert.Empty(overrides); Assert.Empty(overrides);
} }
// ── Task 13: StoreDeployedConfigIfNewerAsync (guarded standby write) ──
/// <summary>
/// Seeds a deployed_configurations row with an explicit deployed_at timestamp using the same
/// "O" format the service uses, so tests can establish deterministic older/newer/equal rows.
/// </summary>
private async Task SeedDeployedConfigAsync(
string instanceName, string configJson, string deploymentId,
string revisionHash, bool isEnabled, DateTimeOffset deployedAt)
{
await using var conn = new SqliteConnection($"Data Source={_dbFile}");
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
INSERT INTO deployed_configurations
(instance_unique_name, config_json, deployment_id, revision_hash, is_enabled, deployed_at)
VALUES (@name, @json, @depId, @hash, @enabled, @deployedAt)
ON CONFLICT(instance_unique_name) DO UPDATE SET
config_json = excluded.config_json,
deployment_id = excluded.deployment_id,
revision_hash = excluded.revision_hash,
is_enabled = excluded.is_enabled,
deployed_at = excluded.deployed_at";
cmd.Parameters.AddWithValue("@name", instanceName);
cmd.Parameters.AddWithValue("@json", configJson);
cmd.Parameters.AddWithValue("@depId", deploymentId);
cmd.Parameters.AddWithValue("@hash", revisionHash);
cmd.Parameters.AddWithValue("@enabled", isEnabled ? 1 : 0);
cmd.Parameters.AddWithValue("@deployedAt", deployedAt.ToString("O"));
await cmd.ExecuteNonQueryAsync();
}
[Fact]
public async Task StoreDeployedConfigIfNewer_NoExistingRow_Inserts()
{
var at = DateTimeOffset.UtcNow;
await _storage.StoreDeployedConfigIfNewerAsync(
"Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", isEnabled: true, deployedAtOverride: at);
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Single(configs);
Assert.Equal("Pump1", configs[0].InstanceUniqueName);
Assert.Equal("{\"v\":1}", configs[0].ConfigJson);
Assert.Equal("dep-001", configs[0].DeploymentId);
Assert.Equal("sha256:aaa", configs[0].RevisionHash);
Assert.True(configs[0].IsEnabled);
}
[Fact]
public async Task StoreDeployedConfigIfNewer_ExistingOlderRow_Overwrites()
{
var olderAt = new DateTimeOffset(2026, 1, 1, 10, 0, 0, TimeSpan.Zero);
var newerAt = new DateTimeOffset(2026, 1, 1, 11, 0, 0, TimeSpan.Zero);
await SeedDeployedConfigAsync("Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", true, olderAt);
await _storage.StoreDeployedConfigIfNewerAsync(
"Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", isEnabled: false, deployedAtOverride: newerAt);
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Single(configs);
Assert.Equal("{\"v\":2}", configs[0].ConfigJson);
Assert.Equal("dep-002", configs[0].DeploymentId);
Assert.Equal("sha256:bbb", configs[0].RevisionHash);
Assert.False(configs[0].IsEnabled);
}
[Fact]
public async Task StoreDeployedConfigIfNewer_ExistingNewerRow_IsNoop()
{
var newerAt = new DateTimeOffset(2026, 6, 1, 12, 0, 0, TimeSpan.Zero);
var olderAt = new DateTimeOffset(2026, 5, 1, 12, 0, 0, TimeSpan.Zero);
// Seed the row that is already newer than what the standby would write
await SeedDeployedConfigAsync("Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", false, newerAt);
// Guarded write with an older timestamp — must be a NO-OP
await _storage.StoreDeployedConfigIfNewerAsync(
"Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", isEnabled: true, deployedAtOverride: olderAt);
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Single(configs);
// The newer seeded row must survive unchanged
Assert.Equal("{\"v\":2}", configs[0].ConfigJson);
Assert.Equal("dep-002", configs[0].DeploymentId);
Assert.Equal("sha256:bbb", configs[0].RevisionHash);
Assert.False(configs[0].IsEnabled);
}
[Fact]
public async Task StoreDeployedConfigIfNewer_EqualDeployedAt_IsNoop()
{
var at = new DateTimeOffset(2026, 3, 15, 9, 30, 0, TimeSpan.Zero);
await SeedDeployedConfigAsync("Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", true, at);
// Guarded write with the IDENTICAL timestamp — must be a NO-OP (> not >=)
await _storage.StoreDeployedConfigIfNewerAsync(
"Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", isEnabled: false, deployedAtOverride: at);
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Single(configs);
// Original row preserved
Assert.Equal("{\"v\":1}", configs[0].ConfigJson);
Assert.Equal("dep-001", configs[0].DeploymentId);
}
} }