From caaa855362e2ca2f44a9a604af12df9d268d46bd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 13:29:21 -0400 Subject: [PATCH] feat(site): older-write guard for replicated config writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add StoreDeployedConfigIfNewerAsync to SiteStorageService — an atomic conditional upsert using SQLite's ON CONFLICT … WHERE clause so the standby node only overwrites a deployed_configurations row when the incoming deployed_at is strictly newer. The active-node path (StoreDeployedConfigAsync) stays unguarded. Four deterministic tests cover: no-row insert, older-overwrites, newer-is-noop, equal-is-noop; all seed rows with explicit DateTimeOffset values via direct SQL to avoid wall-clock timing dependencies. --- .../Persistence/SiteStorageService.cs | 65 +++++++++++ .../Persistence/SiteStorageServiceTests.cs | 109 ++++++++++++++++++ 2 files changed, 174 insertions(+) diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs index fd25814a..910d1b8b 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs @@ -229,6 +229,71 @@ public class SiteStorageService _logger.LogDebug("Stored deployed config for {Instance}, deploymentId={DeploymentId}", instanceName, deploymentId); } + /// + /// Stores a deployed instance configuration only if the incoming write is strictly + /// newer than any already-stored row for the same instance. If no row exists the + /// write is always performed (insert path). Uses SQLite's conditional + /// ON CONFLICT … WHERE excluded.deployed_at > deployed_configurations.deployed_at + /// clause so the guard is atomic with no application-level read-modify-write. + /// + /// + /// This is the standby-node write path for replicated configs. The active-node + /// apply path () remains unguarded and always + /// overwrites, because the active node's write is always authoritative. + /// + /// is exposed for testing so that the exact + /// deployed_at value can be controlled without sleeping between calls. + /// Production callers omit it and receive the default + /// semantics, identical to . + /// + /// + /// The unique name of the instance. + /// The deployed configuration as JSON. + /// The unique deployment identifier. + /// The configuration revision hash for staleness detection. + /// Whether the instance is enabled. + /// + /// Optional explicit deployed_at timestamp. Defaults to + /// when . + /// + /// A task that completes when the conditional upsert has been executed. + public async Task StoreDeployedConfigIfNewerAsync( + string instanceName, + string configJson, + string deploymentId, + string revisionHash, + bool isEnabled, + DateTimeOffset? deployedAtOverride = null) + { + var deployedAt = (deployedAtOverride ?? DateTimeOffset.UtcNow).ToString("O"); + + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var command = connection.CreateCommand(); + command.CommandText = @" + INSERT INTO deployed_configurations + (instance_unique_name, config_json, deployment_id, revision_hash, is_enabled, deployed_at) + VALUES (@name, @json, @depId, @hash, @enabled, @deployedAt) + ON CONFLICT(instance_unique_name) DO UPDATE SET + config_json = excluded.config_json, + deployment_id = excluded.deployment_id, + revision_hash = excluded.revision_hash, + is_enabled = excluded.is_enabled, + deployed_at = excluded.deployed_at + WHERE excluded.deployed_at > deployed_configurations.deployed_at"; + + command.Parameters.AddWithValue("@name", instanceName); + command.Parameters.AddWithValue("@json", configJson); + command.Parameters.AddWithValue("@depId", deploymentId); + command.Parameters.AddWithValue("@hash", revisionHash); + command.Parameters.AddWithValue("@enabled", isEnabled ? 1 : 0); + command.Parameters.AddWithValue("@deployedAt", deployedAt); + + await command.ExecuteNonQueryAsync(); + _logger.LogDebug("StoreDeployedConfigIfNewer for {Instance}, deploymentId={DeploymentId}", instanceName, deploymentId); + } + /// /// Removes a deployed instance configuration and its static overrides. /// diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Persistence/SiteStorageServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Persistence/SiteStorageServiceTests.cs index fe51198c..8adcb42d 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Persistence/SiteStorageServiceTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Persistence/SiteStorageServiceTests.cs @@ -1,3 +1,4 @@ +using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; @@ -194,4 +195,112 @@ public class SiteStorageServiceTests : IAsyncLifetime, IDisposable Assert.Empty(configs); Assert.Empty(overrides); } + + // ── Task 13: StoreDeployedConfigIfNewerAsync (guarded standby write) ── + + /// + /// Seeds a deployed_configurations row with an explicit deployed_at timestamp using the same + /// "O" format the service uses, so tests can establish deterministic older/newer/equal rows. + /// + private async Task SeedDeployedConfigAsync( + string instanceName, string configJson, string deploymentId, + string revisionHash, bool isEnabled, DateTimeOffset deployedAt) + { + await using var conn = new SqliteConnection($"Data Source={_dbFile}"); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = @" + INSERT INTO deployed_configurations + (instance_unique_name, config_json, deployment_id, revision_hash, is_enabled, deployed_at) + VALUES (@name, @json, @depId, @hash, @enabled, @deployedAt) + ON CONFLICT(instance_unique_name) DO UPDATE SET + config_json = excluded.config_json, + deployment_id = excluded.deployment_id, + revision_hash = excluded.revision_hash, + is_enabled = excluded.is_enabled, + deployed_at = excluded.deployed_at"; + cmd.Parameters.AddWithValue("@name", instanceName); + cmd.Parameters.AddWithValue("@json", configJson); + cmd.Parameters.AddWithValue("@depId", deploymentId); + cmd.Parameters.AddWithValue("@hash", revisionHash); + cmd.Parameters.AddWithValue("@enabled", isEnabled ? 1 : 0); + cmd.Parameters.AddWithValue("@deployedAt", deployedAt.ToString("O")); + await cmd.ExecuteNonQueryAsync(); + } + + [Fact] + public async Task StoreDeployedConfigIfNewer_NoExistingRow_Inserts() + { + var at = DateTimeOffset.UtcNow; + + await _storage.StoreDeployedConfigIfNewerAsync( + "Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", isEnabled: true, deployedAtOverride: at); + + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Single(configs); + Assert.Equal("Pump1", configs[0].InstanceUniqueName); + Assert.Equal("{\"v\":1}", configs[0].ConfigJson); + Assert.Equal("dep-001", configs[0].DeploymentId); + Assert.Equal("sha256:aaa", configs[0].RevisionHash); + Assert.True(configs[0].IsEnabled); + } + + [Fact] + public async Task StoreDeployedConfigIfNewer_ExistingOlderRow_Overwrites() + { + var olderAt = new DateTimeOffset(2026, 1, 1, 10, 0, 0, TimeSpan.Zero); + var newerAt = new DateTimeOffset(2026, 1, 1, 11, 0, 0, TimeSpan.Zero); + + await SeedDeployedConfigAsync("Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", true, olderAt); + + await _storage.StoreDeployedConfigIfNewerAsync( + "Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", isEnabled: false, deployedAtOverride: newerAt); + + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Single(configs); + Assert.Equal("{\"v\":2}", configs[0].ConfigJson); + Assert.Equal("dep-002", configs[0].DeploymentId); + Assert.Equal("sha256:bbb", configs[0].RevisionHash); + Assert.False(configs[0].IsEnabled); + } + + [Fact] + public async Task StoreDeployedConfigIfNewer_ExistingNewerRow_IsNoop() + { + var newerAt = new DateTimeOffset(2026, 6, 1, 12, 0, 0, TimeSpan.Zero); + var olderAt = new DateTimeOffset(2026, 5, 1, 12, 0, 0, TimeSpan.Zero); + + // Seed the row that is already newer than what the standby would write + await SeedDeployedConfigAsync("Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", false, newerAt); + + // Guarded write with an older timestamp — must be a NO-OP + await _storage.StoreDeployedConfigIfNewerAsync( + "Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", isEnabled: true, deployedAtOverride: olderAt); + + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Single(configs); + // The newer seeded row must survive unchanged + Assert.Equal("{\"v\":2}", configs[0].ConfigJson); + Assert.Equal("dep-002", configs[0].DeploymentId); + Assert.Equal("sha256:bbb", configs[0].RevisionHash); + Assert.False(configs[0].IsEnabled); + } + + [Fact] + public async Task StoreDeployedConfigIfNewer_EqualDeployedAt_IsNoop() + { + var at = new DateTimeOffset(2026, 3, 15, 9, 30, 0, TimeSpan.Zero); + + await SeedDeployedConfigAsync("Pump1", "{\"v\":1}", "dep-001", "sha256:aaa", true, at); + + // Guarded write with the IDENTICAL timestamp — must be a NO-OP (> not >=) + await _storage.StoreDeployedConfigIfNewerAsync( + "Pump1", "{\"v\":2}", "dep-002", "sha256:bbb", isEnabled: false, deployedAtOverride: at); + + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Single(configs); + // Original row preserved + Assert.Equal("{\"v\":1}", configs[0].ConfigJson); + Assert.Equal("dep-001", configs[0].DeploymentId); + } }