using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.SiteRuntime.Tracking;
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
///
/// Shared end-to-end harness for the M3 cached-call combined telemetry tests
/// (G2/G3/G4). Composes the full pipeline:
///
/// - Site-local SQLite (in-memory) +
/// + .
/// - Site-local SQLite (in-memory).
/// - Production wrapped by a
/// test-side that also pushes each
/// packet through the stub gRPC client.
/// - wired to the
/// dispatcher so a single observer call fans out audit + tracking + wire.
/// - connected
/// to an backed by the real
/// +
/// against the per-test database.
///
///
///
///
/// Disposal cleans up the in-memory SQLite stores. The Akka actor system is
/// owned by the calling ; the harness
/// only owns the ingest actor IActorRef and the underlying repositories'
/// DbContext lifecycle.
///
///
public sealed class CombinedTelemetryHarness : IAsyncDisposable
{
public SqliteAuditWriter SqliteWriter { get; }
public RingBufferFallback Ring { get; }
public FallbackAuditWriter FallbackWriter { get; }
public OperationTrackingStore TrackingStore { get; }
public CachedCallTelemetryForwarder InnerForwarder { get; }
public CombinedTelemetryDispatcher Dispatcher { get; }
public CachedCallLifecycleBridge Bridge { get; }
public DirectActorSiteStreamAuditClient StubClient { get; }
public IActorRef IngestActor { get; }
public IServiceProvider ServiceProvider { get; }
private readonly MsSqlMigrationFixture _fixture;
private bool _disposed;
public CombinedTelemetryHarness(
MsSqlMigrationFixture fixture,
Akka.TestKit.Xunit2.TestKit testKit,
Func? siteCallRepoOverride = null)
{
_fixture = fixture ?? throw new ArgumentNullException(nameof(fixture));
ArgumentNullException.ThrowIfNull(testKit);
// Site SQLite — unique in-memory database per harness so tests don't share
// an audit queue. Mode=Memory + Cache=Shared keeps the file alive for the
// lifetime of the writer connection.
SqliteWriter = new SqliteAuditWriter(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
}),
NullLogger.Instance,
connectionStringOverride:
$"Data Source=file:cachedcall-g-{Guid.NewGuid():N}?mode=memory&cache=shared");
Ring = new RingBufferFallback();
FallbackWriter = new FallbackAuditWriter(
SqliteWriter, Ring, new NoOpAuditWriteFailureCounter(),
NullLogger.Instance);
TrackingStore = new OperationTrackingStore(
Options.Create(new OperationTrackingOptions
{
// Same shared-in-memory pattern as the audit writer.
ConnectionString =
$"Data Source=file:tracking-g-{Guid.NewGuid():N}?mode=memory&cache=shared",
}),
NullLogger.Instance);
// Central wiring: real repositories backed by the MSSQL fixture's DB.
ServiceProvider = BuildCentralServiceProvider(siteCallRepoOverride);
IngestActor = testKit.Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
ServiceProvider,
NullLogger.Instance)));
StubClient = new DirectActorSiteStreamAuditClient(IngestActor);
// Production forwarder writes the local stores; the dispatcher wraps
// it to ALSO push the same packet to central via the stub client.
InnerForwarder = new CachedCallTelemetryForwarder(
FallbackWriter, TrackingStore, NullLogger.Instance);
Dispatcher = new CombinedTelemetryDispatcher(InnerForwarder, StubClient);
Bridge = new CachedCallLifecycleBridge(Dispatcher, NullLogger.Instance);
}
///
/// Convenience: emit the initial submit packet directly through the
/// dispatcher (the bridge's hooks fire only for S&F retry-loop
/// attempts; submit-row emission happens at the script call site).
///
public Task EmitSubmitAsync(CachedCallTelemetry submit, CancellationToken ct = default) =>
Dispatcher.ForwardAsync(submit, ct);
///
/// Convenience: route a per-attempt or terminal outcome through the bridge.
///
public Task EmitAttemptAsync(CachedCallAttemptContext context, CancellationToken ct = default) =>
Bridge.OnAttemptCompletedAsync(context, ct);
public ScadaLinkDbContext CreateReadContext()
{
var options = new DbContextOptionsBuilder()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private IServiceProvider BuildCentralServiceProvider(
Func? siteCallRepoOverride)
{
var services = new ServiceCollection();
services.AddDbContext(opts =>
opts.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)));
services.AddScoped(sp =>
new AuditLogRepository(sp.GetRequiredService()));
if (siteCallRepoOverride is null)
{
services.AddScoped(sp =>
new SiteCallAuditRepository(sp.GetRequiredService()));
}
else
{
services.AddScoped(sp =>
siteCallRepoOverride(sp.GetRequiredService()));
}
return services.BuildServiceProvider();
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await SqliteWriter.DisposeAsync().ConfigureAwait(false);
await TrackingStore.DisposeAsync().ConfigureAwait(false);
if (ServiceProvider is IAsyncDisposable asyncSp)
{
await asyncSp.DisposeAsync().ConfigureAwait(false);
}
else if (ServiceProvider is IDisposable sp)
{
sp.Dispose();
}
}
}