using System.Collections.Concurrent; using System.Reflection; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Admin.Hubs; using ZB.MOM.WW.OtOpcUa.Admin.Services; namespace ZB.MOM.WW.OtOpcUa.Admin.Tests; /// /// Regression for Admin-011 — kept three plain /// Dictionary<,> caches that were enumerated/mutated from the steady-state /// poll loop and cleared from ResetCache() with no synchronization. A concurrent /// ResetCache() during a poll iteration could throw /// or corrupt the dictionary. The fix swaps the /// caches for so reset + concurrent /// reads/writes are safe by construction. /// [Trait("Category", "Unit")] public sealed class FleetStatusPollerConcurrencyTests { [Fact] public void Cache_fields_are_thread_safe_collections() { // The fix uses ConcurrentDictionary; that makes ResetCache() and concurrent // poll-tick mutations safe by construction. Guard the structural choice with // reflection so a future refactor cannot silently revert to plain Dictionary // without flipping this guardrail. var fields = typeof(FleetStatusPoller) .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(f => f.Name is "_last" or "_lastRole" or "_lastResilience") .ToList(); fields.Count.ShouldBe(3, "expected the three cache fields _last/_lastRole/_lastResilience to exist"); foreach (var f in fields) { var type = f.FieldType; type.IsGenericType.ShouldBeTrue($"{f.Name} should be a generic concurrent collection"); type.GetGenericTypeDefinition().ShouldBe( typeof(ConcurrentDictionary<,>), customMessage: $"{f.Name} must be a ConcurrentDictionary<,> so concurrent ResetCache()/poll calls are safe — plain Dictionary regressed Admin-011."); } } [Fact] public void ResetCache_is_safe_to_call_concurrently_with_cache_mutations() { // Stress test — hammer the cache with mutate/clear concurrently. With plain // Dictionary this throws InvalidOperationException ("Collection was modified") // or corrupts internal state. With ConcurrentDictionary it must complete cleanly. var poller = BuildPollerForReflectionTest(); var lastField = typeof(FleetStatusPoller).GetField("_last", BindingFlags.NonPublic | BindingFlags.Instance)!; var cache = lastField.GetValue(poller)!; var cacheType = cache.GetType(); var indexer = cacheType.GetProperty("Item")!; var keyType = cacheType.GetGenericArguments()[0]; // string var valueType = cacheType.GetGenericArguments()[1]; // NodeStateSnapshot record-struct var defaultSnapshot = Activator.CreateInstance(valueType)!; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); var writer = Task.Run(() => { var i = 0; while (!cts.IsCancellationRequested) { indexer.SetValue(cache, defaultSnapshot, new object[] { $"node-{i++ % 64}" }); } }); var resetter = Task.Run(() => { var method = typeof(FleetStatusPoller).GetMethod("ResetCache", BindingFlags.NonPublic | BindingFlags.Instance)!; while (!cts.IsCancellationRequested) { method.Invoke(poller, null); } }); // Should not throw — the whole point is that the two run concurrently safely. Should.NotThrow(() => Task.WaitAll([writer, resetter])); } private static FleetStatusPoller BuildPollerForReflectionTest() { // Pass null-style stubs — the poller constructor doesn't touch them and we // never call ExecuteAsync/PollOnceAsync here (those need a real DB context). // We only exercise ResetCache + cache mutation by reflection. var scopeFactory = new StubServiceScopeFactory(); var fleetHub = new StubHubContext(); var alertHub = new StubHubContext(); return new FleetStatusPoller( scopeFactory, fleetHub, alertHub, NullLogger.Instance, new RedundancyMetrics()); } private sealed class StubServiceScopeFactory : IServiceScopeFactory { public IServiceScope CreateScope() => throw new NotImplementedException(); } private sealed class StubHubContext : IHubContext where THub : Hub { public IHubClients Clients => throw new NotImplementedException(); public IGroupManager Groups => throw new NotImplementedException(); } }