176 lines
7.7 KiB
C#
176 lines
7.7 KiB
C#
using Akka.Actor;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using ScadaLink.AuditLog.Central;
|
|
using ScadaLink.AuditLog.Site;
|
|
using ScadaLink.AuditLog.Site.Telemetry;
|
|
using ScadaLink.Commons.Interfaces;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.ConfigurationDatabase;
|
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
|
using ScadaLink.SiteRuntime.Tracking;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
|
|
|
/// <summary>
|
|
/// Shared end-to-end harness for the M3 cached-call combined telemetry tests
|
|
/// (G2/G3/G4). Composes the full pipeline:
|
|
/// <list type="bullet">
|
|
/// <item><description>Site-local SQLite <see cref="SqliteAuditWriter"/> (in-memory) +
|
|
/// <see cref="RingBufferFallback"/> + <see cref="FallbackAuditWriter"/>.</description></item>
|
|
/// <item><description>Site-local SQLite <see cref="OperationTrackingStore"/> (in-memory).</description></item>
|
|
/// <item><description>Production <see cref="CachedCallTelemetryForwarder"/> wrapped by a
|
|
/// test-side <see cref="CombinedTelemetryDispatcher"/> that also pushes each
|
|
/// packet through the stub gRPC client.</description></item>
|
|
/// <item><description><see cref="CachedCallLifecycleBridge"/> wired to the
|
|
/// dispatcher so a single observer call fans out audit + tracking + wire.</description></item>
|
|
/// <item><description><see cref="DirectActorSiteStreamAuditClient"/> connected
|
|
/// to an <see cref="AuditLogIngestActor"/> backed by the real
|
|
/// <see cref="AuditLogRepository"/> + <see cref="SiteCallAuditRepository"/>
|
|
/// against the per-test <see cref="MsSqlMigrationFixture"/> database.</description></item>
|
|
/// </list>
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Disposal cleans up the in-memory SQLite stores. The Akka actor system is
|
|
/// owned by the calling <see cref="Akka.TestKit.Xunit2.TestKit"/>; the harness
|
|
/// only owns the ingest actor IActorRef and the underlying repositories'
|
|
/// DbContext lifecycle.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class CombinedTelemetryHarness : IAsyncDisposable
|
|
{
|
|
public SqliteAuditWriter SqliteWriter { get; }
|
|
public RingBufferFallback Ring { get; }
|
|
public FallbackAuditWriter FallbackWriter { get; }
|
|
public OperationTrackingStore TrackingStore { get; }
|
|
public CachedCallTelemetryForwarder InnerForwarder { get; }
|
|
public CombinedTelemetryDispatcher Dispatcher { get; }
|
|
public CachedCallLifecycleBridge Bridge { get; }
|
|
public DirectActorSiteStreamAuditClient StubClient { get; }
|
|
public IActorRef IngestActor { get; }
|
|
public IServiceProvider ServiceProvider { get; }
|
|
|
|
private readonly MsSqlMigrationFixture _fixture;
|
|
private bool _disposed;
|
|
|
|
public CombinedTelemetryHarness(
|
|
MsSqlMigrationFixture fixture,
|
|
Akka.TestKit.Xunit2.TestKit testKit,
|
|
Func<ScadaLinkDbContext, ISiteCallAuditRepository>? siteCallRepoOverride = null)
|
|
{
|
|
_fixture = fixture ?? throw new ArgumentNullException(nameof(fixture));
|
|
ArgumentNullException.ThrowIfNull(testKit);
|
|
|
|
// Site SQLite — unique in-memory database per harness so tests don't share
|
|
// an audit queue. Mode=Memory + Cache=Shared keeps the file alive for the
|
|
// lifetime of the writer connection.
|
|
SqliteWriter = new SqliteAuditWriter(
|
|
Options.Create(new SqliteAuditWriterOptions
|
|
{
|
|
DatabasePath = "ignored",
|
|
BatchSize = 64,
|
|
ChannelCapacity = 1024,
|
|
}),
|
|
NullLogger<SqliteAuditWriter>.Instance,
|
|
connectionStringOverride:
|
|
$"Data Source=file:cachedcall-g-{Guid.NewGuid():N}?mode=memory&cache=shared");
|
|
|
|
Ring = new RingBufferFallback();
|
|
FallbackWriter = new FallbackAuditWriter(
|
|
SqliteWriter, Ring, new NoOpAuditWriteFailureCounter(),
|
|
NullLogger<FallbackAuditWriter>.Instance);
|
|
|
|
TrackingStore = new OperationTrackingStore(
|
|
Options.Create(new OperationTrackingOptions
|
|
{
|
|
// Same shared-in-memory pattern as the audit writer.
|
|
ConnectionString =
|
|
$"Data Source=file:tracking-g-{Guid.NewGuid():N}?mode=memory&cache=shared",
|
|
}),
|
|
NullLogger<OperationTrackingStore>.Instance);
|
|
|
|
// Central wiring: real repositories backed by the MSSQL fixture's DB.
|
|
ServiceProvider = BuildCentralServiceProvider(siteCallRepoOverride);
|
|
IngestActor = testKit.Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
|
|
ServiceProvider,
|
|
NullLogger<AuditLogIngestActor>.Instance)));
|
|
|
|
StubClient = new DirectActorSiteStreamAuditClient(IngestActor);
|
|
|
|
// Production forwarder writes the local stores; the dispatcher wraps
|
|
// it to ALSO push the same packet to central via the stub client.
|
|
InnerForwarder = new CachedCallTelemetryForwarder(
|
|
FallbackWriter, TrackingStore, NullLogger<CachedCallTelemetryForwarder>.Instance);
|
|
Dispatcher = new CombinedTelemetryDispatcher(InnerForwarder, StubClient);
|
|
|
|
Bridge = new CachedCallLifecycleBridge(Dispatcher, NullLogger<CachedCallLifecycleBridge>.Instance);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Convenience: emit the initial submit packet directly through the
|
|
/// dispatcher (the bridge's hooks fire only for S&F retry-loop
|
|
/// attempts; submit-row emission happens at the script call site).
|
|
/// </summary>
|
|
public Task EmitSubmitAsync(CachedCallTelemetry submit, CancellationToken ct = default) =>
|
|
Dispatcher.ForwardAsync(submit, ct);
|
|
|
|
/// <summary>
|
|
/// Convenience: route a per-attempt or terminal outcome through the bridge.
|
|
/// </summary>
|
|
public Task EmitAttemptAsync(CachedCallAttemptContext context, CancellationToken ct = default) =>
|
|
Bridge.OnAttemptCompletedAsync(context, ct);
|
|
|
|
public ScadaLinkDbContext CreateReadContext()
|
|
{
|
|
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
|
.UseSqlServer(_fixture.ConnectionString)
|
|
.Options;
|
|
return new ScadaLinkDbContext(options);
|
|
}
|
|
|
|
private IServiceProvider BuildCentralServiceProvider(
|
|
Func<ScadaLinkDbContext, ISiteCallAuditRepository>? siteCallRepoOverride)
|
|
{
|
|
var services = new ServiceCollection();
|
|
services.AddDbContext<ScadaLinkDbContext>(opts =>
|
|
opts.UseSqlServer(_fixture.ConnectionString)
|
|
.ConfigureWarnings(w => w.Ignore(
|
|
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)));
|
|
services.AddScoped<IAuditLogRepository>(sp =>
|
|
new AuditLogRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
|
|
if (siteCallRepoOverride is null)
|
|
{
|
|
services.AddScoped<ISiteCallAuditRepository>(sp =>
|
|
new SiteCallAuditRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
|
|
}
|
|
else
|
|
{
|
|
services.AddScoped(sp =>
|
|
siteCallRepoOverride(sp.GetRequiredService<ScadaLinkDbContext>()));
|
|
}
|
|
return services.BuildServiceProvider();
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
await SqliteWriter.DisposeAsync().ConfigureAwait(false);
|
|
await TrackingStore.DisposeAsync().ConfigureAwait(false);
|
|
if (ServiceProvider is IAsyncDisposable asyncSp)
|
|
{
|
|
await asyncSp.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
else if (ServiceProvider is IDisposable sp)
|
|
{
|
|
sp.Dispose();
|
|
}
|
|
}
|
|
}
|