fix(scripted-alarms): resolve High code-review finding (Core.ScriptedAlarms-001)
_alarms was a plain Dictionary<string, AlarmState> mutated under the _evalGate semaphore, but four read paths (GetState, GetAllStates, the LoadedAlarmIds property, and RunShelvingCheck) touched it from arbitrary threads with no synchronisation. A Dictionary read concurrent with a writer's entry reassignment can throw InvalidOperationException or return torn state. Switched _alarms to ConcurrentDictionary<string, AlarmState>. The only write shapes are indexer-set and Clear, both atomic on ConcurrentDictionary, so all mutations stay correct without further change; reads now get safe snapshot semantics. LoadedAlarmIds materialises the key snapshot to keep its IReadOnlyCollection<string> return type. This matches _valueCache, which is already a ConcurrentDictionary. Added a regression test (Concurrent_reads_during_mutation_do_not_throw) that hammers the engine with state mutations while four reader threads continuously call the three unguarded read paths. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -303,6 +303,67 @@ public sealed class ScriptedAlarmEngineTests
|
||||
up.ActiveSubscriptionCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Concurrent_reads_during_mutation_do_not_throw(/* Core.ScriptedAlarms-001 */)
|
||||
{
|
||||
// Regression for Core.ScriptedAlarms-001: GetState / GetAllStates /
|
||||
// LoadedAlarmIds touch _alarms from arbitrary threads with no lock while
|
||||
// ApplyAsync / ReevaluateAsync reassign dictionary entries under _evalGate.
|
||||
// With a plain Dictionary this race throws InvalidOperationException or
|
||||
// returns torn state; with a ConcurrentDictionary the reads are safe.
|
||||
var up = new FakeUpstream();
|
||||
const int alarmCount = 40;
|
||||
var defs = new List<ScriptedAlarmDefinition>();
|
||||
for (var i = 0; i < alarmCount; i++)
|
||||
{
|
||||
up.Set($"Temp{i}", 50);
|
||||
defs.Add(Alarm($"A{i}", $$"""return (int)ctx.GetTag("Temp{{i}}").Value > 100;"""));
|
||||
}
|
||||
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync(defs, TestContext.Current.CancellationToken);
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
||||
Exception? readerFailure = null;
|
||||
|
||||
// Writer: hammer the engine with state mutations that reassign _alarms entries.
|
||||
var writer = Task.Run(async () =>
|
||||
{
|
||||
var round = 0;
|
||||
while (!cts.IsCancellationRequested)
|
||||
{
|
||||
var id = $"A{round % alarmCount}";
|
||||
await eng.AddCommentAsync(id, "tester", $"r{round}", CancellationToken.None);
|
||||
up.Push($"Temp{round % alarmCount}", round % 2 == 0 ? 150 : 50);
|
||||
round++;
|
||||
}
|
||||
});
|
||||
|
||||
// Readers: continuously touch the three unguarded read paths.
|
||||
var readers = Enumerable.Range(0, 4).Select(_reader => Task.Run(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!cts.IsCancellationRequested)
|
||||
{
|
||||
_ = eng.LoadedAlarmIds.Count;
|
||||
_ = eng.GetAllStates().Count;
|
||||
for (var i = 0; i < alarmCount; i++)
|
||||
_ = eng.GetState($"A{i}");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
readerFailure = ex;
|
||||
}
|
||||
})).ToArray();
|
||||
|
||||
await Task.WhenAll([writer, .. readers]);
|
||||
|
||||
readerFailure.ShouldBeNull(
|
||||
"concurrent reads of _alarms while it is being mutated must not throw");
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> cond, int timeoutMs = 2000)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
||||
|
||||
Reference in New Issue
Block a user