using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.AuditLog.Telemetry; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Types; using ScadaLink.Communication.Grpc; using Google.Protobuf.WellKnownTypes; using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp; namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure; /// /// Test-side combined-telemetry dispatcher: wraps a production /// so the local audit + tracking /// stores still get written, then projects the same packet onto the wire as a /// and pushes it through the supplied /// . The bridge can be composed into the /// existing chain as the /// implementation so a single /// observer callback fans out to both halves. /// /// /// /// Production wiring keeps the wire push deferred to M6 — the site SQLite hot /// path is the source of truth and a future M6 drain will push the rows /// through the gRPC client. For end-to-end testing today we need a way to /// exercise the central dual-write transaction immediately, so this /// dispatcher synthesises the wire packet inline and round-trips it through /// the stub client. The shape mirrors what the M6 drain will eventually emit. /// /// /// Best-effort: both the inner forwarder call and the wire push are /// wrapped in independent try/catch blocks. A thrown wire client doesn't /// abort the local writes (the audit row is already in SQLite); a thrown /// local forwarder doesn't abort the wire push (central still gets the /// dual-write attempt). /// /// public sealed class CombinedTelemetryDispatcher : ICachedCallTelemetryForwarder { private readonly ICachedCallTelemetryForwarder _inner; private readonly ISiteStreamAuditClient _wireClient; public CombinedTelemetryDispatcher( ICachedCallTelemetryForwarder inner, ISiteStreamAuditClient wireClient) { _inner = inner ?? throw new ArgumentNullException(nameof(inner)); _wireClient = wireClient ?? throw new ArgumentNullException(nameof(wireClient)); } /// public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(telemetry); // Inner forwarder writes the audit row to SQLite + updates the // tracking store. Best-effort — exceptions are already swallowed // inside the production forwarder, but wrap defensively here too in // case a test substitutes a stricter inner. try { await _inner.ForwardAsync(telemetry, ct).ConfigureAwait(false); } catch { // Swallow — alog.md §7 best-effort contract. } // Project the same packet onto the wire and push it through the stub // client. This is the bit a future M6 drain will subsume — until // then the test wraps the two halves into one observer-driven step. try { var batch = new CachedTelemetryBatch(); batch.Packets.Add(BuildPacket(telemetry)); await _wireClient.IngestCachedTelemetryAsync(batch, ct).ConfigureAwait(false); } catch { // Swallow — the audit row is still in SQLite for a future drain; // the central row will materialise the next time the wire path // is exercised (or via the M6 reconciliation pull). } } private static CachedTelemetryPacket BuildPacket(CachedCallTelemetry telemetry) { return new CachedTelemetryPacket { AuditEvent = AuditEventMapper.ToDto(telemetry.Audit), Operational = ToOperationalDto(telemetry.Operational), }; } private static SiteCallOperationalDto ToOperationalDto(SiteCallOperational op) { var dto = new SiteCallOperationalDto { TrackedOperationId = op.TrackedOperationId.Value.ToString("D"), Channel = op.Channel, Target = op.Target, SourceSite = op.SourceSite, Status = op.Status, RetryCount = op.RetryCount, LastError = op.LastError ?? string.Empty, CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.CreatedAtUtc)), UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.UpdatedAtUtc)), }; if (op.HttpStatus.HasValue) { dto.HttpStatus = op.HttpStatus.Value; } if (op.TerminalAtUtc.HasValue) { dto.TerminalAtUtc = Timestamp.FromDateTime(EnsureUtc(op.TerminalAtUtc.Value)); } return dto; } private static DateTime EnsureUtc(DateTime value) => value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc); }