using Akka.Actor; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.AuditLog.Central; using ScadaLink.AuditLog.Site; using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.Commons.Interfaces; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Integration; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; using ScadaLink.SiteRuntime.Tracking; namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure; /// /// Shared end-to-end harness for the M3 cached-call combined telemetry tests /// (G2/G3/G4). Composes the full pipeline: /// /// Site-local SQLite (in-memory) + /// + . /// Site-local SQLite (in-memory). /// Production wrapped by a /// test-side that also pushes each /// packet through the stub gRPC client. /// wired to the /// dispatcher so a single observer call fans out audit + tracking + wire. /// connected /// to an backed by the real /// + /// against the per-test database. /// /// /// /// /// Disposal cleans up the in-memory SQLite stores. The Akka actor system is /// owned by the calling ; the harness /// only owns the ingest actor IActorRef and the underlying repositories' /// DbContext lifecycle. /// /// public sealed class CombinedTelemetryHarness : IAsyncDisposable { public SqliteAuditWriter SqliteWriter { get; } public RingBufferFallback Ring { get; } public FallbackAuditWriter FallbackWriter { get; } public OperationTrackingStore TrackingStore { get; } public CachedCallTelemetryForwarder InnerForwarder { get; } public CombinedTelemetryDispatcher Dispatcher { get; } public CachedCallLifecycleBridge Bridge { get; } public DirectActorSiteStreamAuditClient StubClient { get; } public IActorRef IngestActor { get; } public IServiceProvider ServiceProvider { get; } private readonly MsSqlMigrationFixture _fixture; private bool _disposed; public CombinedTelemetryHarness( MsSqlMigrationFixture fixture, Akka.TestKit.Xunit2.TestKit testKit, Func? siteCallRepoOverride = null) { _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); ArgumentNullException.ThrowIfNull(testKit); // Site SQLite — unique in-memory database per harness so tests don't share // an audit queue. Mode=Memory + Cache=Shared keeps the file alive for the // lifetime of the writer connection. SqliteWriter = new SqliteAuditWriter( Options.Create(new SqliteAuditWriterOptions { DatabasePath = "ignored", BatchSize = 64, ChannelCapacity = 1024, }), NullLogger.Instance, connectionStringOverride: $"Data Source=file:cachedcall-g-{Guid.NewGuid():N}?mode=memory&cache=shared"); Ring = new RingBufferFallback(); FallbackWriter = new FallbackAuditWriter( SqliteWriter, Ring, new NoOpAuditWriteFailureCounter(), NullLogger.Instance); TrackingStore = new OperationTrackingStore( Options.Create(new OperationTrackingOptions { // Same shared-in-memory pattern as the audit writer. ConnectionString = $"Data Source=file:tracking-g-{Guid.NewGuid():N}?mode=memory&cache=shared", }), NullLogger.Instance); // Central wiring: real repositories backed by the MSSQL fixture's DB. ServiceProvider = BuildCentralServiceProvider(siteCallRepoOverride); IngestActor = testKit.Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( ServiceProvider, NullLogger.Instance))); StubClient = new DirectActorSiteStreamAuditClient(IngestActor); // Production forwarder writes the local stores; the dispatcher wraps // it to ALSO push the same packet to central via the stub client. InnerForwarder = new CachedCallTelemetryForwarder( FallbackWriter, TrackingStore, NullLogger.Instance); Dispatcher = new CombinedTelemetryDispatcher(InnerForwarder, StubClient); Bridge = new CachedCallLifecycleBridge(Dispatcher, NullLogger.Instance); } /// /// Convenience: emit the initial submit packet directly through the /// dispatcher (the bridge's hooks fire only for S&F retry-loop /// attempts; submit-row emission happens at the script call site). /// public Task EmitSubmitAsync(CachedCallTelemetry submit, CancellationToken ct = default) => Dispatcher.ForwardAsync(submit, ct); /// /// Convenience: route a per-attempt or terminal outcome through the bridge. /// public Task EmitAttemptAsync(CachedCallAttemptContext context, CancellationToken ct = default) => Bridge.OnAttemptCompletedAsync(context, ct); public ScadaLinkDbContext CreateReadContext() { var options = new DbContextOptionsBuilder() .UseSqlServer(_fixture.ConnectionString) .Options; return new ScadaLinkDbContext(options); } private IServiceProvider BuildCentralServiceProvider( Func? siteCallRepoOverride) { var services = new ServiceCollection(); services.AddDbContext(opts => opts.UseSqlServer(_fixture.ConnectionString) .ConfigureWarnings(w => w.Ignore( Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))); services.AddScoped(sp => new AuditLogRepository(sp.GetRequiredService())); if (siteCallRepoOverride is null) { services.AddScoped(sp => new SiteCallAuditRepository(sp.GetRequiredService())); } else { services.AddScoped(sp => siteCallRepoOverride(sp.GetRequiredService())); } return services.BuildServiceProvider(); } public async ValueTask DisposeAsync() { if (_disposed) return; _disposed = true; await SqliteWriter.DisposeAsync().ConfigureAwait(false); await TrackingStore.DisposeAsync().ConfigureAwait(false); if (ServiceProvider is IAsyncDisposable asyncSp) { await asyncSp.DisposeAsync().ConfigureAwait(false); } else if (ServiceProvider is IDisposable sp) { sp.Dispose(); } } }