154 lines
6.4 KiB
C#
154 lines
6.4 KiB
C#
using System.Collections;
|
|
using System.Reflection;
|
|
using System.Security.Claims;
|
|
using Bunit;
|
|
using Microsoft.AspNetCore.Components.Authorization;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using NSubstitute;
|
|
using ScadaLink.CentralUI.Auth;
|
|
using ScadaLink.Commons.Entities.Sites;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Messages.Streaming;
|
|
using ScadaLink.Communication;
|
|
using ScadaLink.Communication.Grpc;
|
|
using DebugViewPage = ScadaLink.CentralUI.Components.Pages.Deployment.DebugView;
|
|
|
|
namespace ScadaLink.CentralUI.Tests.Deployment;
|
|
|
|
/// <summary>
|
|
/// Regression tests for CentralUI-021. The <c>DebugView</c> stream callback runs
|
|
/// on an Akka/gRPC thread; it used to call <c>UpsertWithCap</c> directly on that
|
|
/// thread, mutating the <c>_attributeValues</c>/<c>_alarmStates</c>
|
|
/// <see cref="Dictionary{TKey,TValue}"/> while the render thread enumerated the
|
|
/// same dictionaries via <c>FilteredAttributeValues</c>. <c>Dictionary</c> is
|
|
/// not thread-safe, so the write could throw "Collection was modified" or
|
|
/// corrupt the buckets. The fix routes the callback through
|
|
/// <c>HandleStreamEvent</c>, which marshals the mutation onto the renderer's
|
|
/// dispatcher so every dictionary access happens on one thread.
|
|
/// </summary>
|
|
public class DebugViewStreamRaceTests : BunitContext
|
|
{
|
|
private IRenderedComponent<DebugViewPage> RenderPage()
|
|
{
|
|
JSInterop.Mode = JSRuntimeMode.Loose;
|
|
|
|
var repo = Substitute.For<ITemplateEngineRepository>();
|
|
var siteRepo = Substitute.For<ISiteRepository>();
|
|
siteRepo.GetAllSitesAsync().Returns(new List<Site>());
|
|
Services.AddSingleton(repo);
|
|
Services.AddSingleton(siteRepo);
|
|
|
|
var comms = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
Services.AddSingleton(comms);
|
|
|
|
var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
|
|
var debugStream = new DebugStreamService(
|
|
comms, Services.BuildServiceProvider(), grpcFactory,
|
|
NullLogger<DebugStreamService>.Instance);
|
|
Services.AddSingleton(debugStream);
|
|
|
|
var identity = new ClaimsIdentity(
|
|
new[] { new Claim(ClaimTypes.Name, "deployer") }, "TestCookie");
|
|
var stubAuth = new StubAuthStateProvider(
|
|
new AuthenticationState(new ClaimsPrincipal(identity)));
|
|
Services.AddSingleton<AuthenticationStateProvider>(stubAuth);
|
|
Services.AddScoped(_ => new SiteScopeService(stubAuth));
|
|
|
|
return Render<DebugViewPage>();
|
|
}
|
|
|
|
private sealed class StubAuthStateProvider : AuthenticationStateProvider
|
|
{
|
|
private readonly AuthenticationState _state;
|
|
public StubAuthStateProvider(AuthenticationState state) => _state = state;
|
|
public override Task<AuthenticationState> GetAuthenticationStateAsync()
|
|
=> Task.FromResult(_state);
|
|
}
|
|
|
|
private static MethodInfo HandleStreamEvent => typeof(DebugViewPage).GetMethod(
|
|
"HandleStreamEvent", BindingFlags.Instance | BindingFlags.NonPublic)!;
|
|
|
|
private static IDictionary AttributeValues(DebugViewPage c) => (IDictionary)
|
|
typeof(DebugViewPage).GetField("_attributeValues",
|
|
BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(c)!;
|
|
|
|
private static IEnumerable FilteredAttributeValues(DebugViewPage c) => (IEnumerable)
|
|
typeof(DebugViewPage).GetProperty("FilteredAttributeValues",
|
|
BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(c)!;
|
|
|
|
[Fact]
|
|
public void HandleStreamEvent_AppliesUpdate_OnceDispatcherRuns()
|
|
{
|
|
// The fix defers the mutation onto the dispatcher — it must not drop it.
|
|
var cut = RenderPage();
|
|
var dict = AttributeValues(cut.Instance);
|
|
|
|
var evt = new AttributeValueChanged(
|
|
"Inst-1", "Pump.Speed", "Speed", 42, "Good", DateTimeOffset.UtcNow);
|
|
HandleStreamEvent.Invoke(cut.Instance, new object[] { evt });
|
|
|
|
cut.WaitForState(() => dict.Count == 1, TimeSpan.FromSeconds(2));
|
|
Assert.True(dict.Contains("Speed"));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HandleStreamEvent_OffThreadEvents_DoNotFaultDispatcherReads()
|
|
{
|
|
// CentralUI-021 reproduction. Writers fire stream events from background
|
|
// threads (the Akka/gRPC callback threads). The reader enumerates
|
|
// FilteredAttributeValues *through the renderer's dispatcher* — exactly
|
|
// as the real render thread does. Pre-fix the writers mutated the
|
|
// Dictionary directly on their own threads, racing the dispatcher-side
|
|
// enumeration and intermittently throwing "Collection was modified".
|
|
// Post-fix every write is marshalled onto the dispatcher, so writes and
|
|
// reads are serialised on one thread and the enumeration never faults.
|
|
var cut = RenderPage();
|
|
var dict = AttributeValues(cut.Instance);
|
|
|
|
Exception? failure = null;
|
|
using var stop = new CancellationTokenSource();
|
|
|
|
var writers = Enumerable.Range(0, 4).Select(w => Task.Run(() =>
|
|
{
|
|
try
|
|
{
|
|
for (var i = 0; i < 600 && !stop.IsCancellationRequested; i++)
|
|
{
|
|
var evt = new AttributeValueChanged(
|
|
"Inst-1", $"Tag.{w}.{i}", $"Tag-{w}-{i}",
|
|
i, "Good", DateTimeOffset.UtcNow);
|
|
HandleStreamEvent.Invoke(cut.Instance, new object[] { evt });
|
|
}
|
|
}
|
|
catch (Exception ex) { failure ??= ex; stop.Cancel(); }
|
|
})).ToArray();
|
|
|
|
var reader = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
while (!stop.IsCancellationRequested)
|
|
{
|
|
await cut.InvokeAsync(() =>
|
|
{
|
|
foreach (var _ in FilteredAttributeValues(cut.Instance)) { }
|
|
});
|
|
}
|
|
}
|
|
catch (Exception ex) { failure ??= ex; stop.Cancel(); }
|
|
});
|
|
|
|
await Task.WhenAll(writers);
|
|
stop.Cancel();
|
|
await reader.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
Assert.Null(failure);
|
|
// Sanity: events were actually delivered (cap is honoured separately).
|
|
cut.WaitForState(() => dict.Count > 0, TimeSpan.FromSeconds(2));
|
|
}
|
|
}
|