125 lines
5.0 KiB
C#
125 lines
5.0 KiB
C#
using ScadaLink.AuditLog.Site.Telemetry;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Communication.Grpc;
|
|
using Google.Protobuf.WellKnownTypes;
|
|
using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
|
|
|
/// <summary>
|
|
/// Test-side combined-telemetry dispatcher: wraps a production
|
|
/// <see cref="ICachedCallTelemetryForwarder"/> so the local audit + tracking
|
|
/// stores still get written, then projects the same packet onto the wire as a
|
|
/// <see cref="CachedTelemetryBatch"/> and pushes it through the supplied
|
|
/// <see cref="ISiteStreamAuditClient"/>. The bridge can be composed into the
|
|
/// existing <see cref="CachedCallLifecycleBridge"/> chain as the
|
|
/// <see cref="ICachedCallTelemetryForwarder"/> implementation so a single
|
|
/// observer callback fans out to both halves.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Production wiring keeps the wire push deferred to M6 — the site SQLite hot
|
|
/// path is the source of truth and a future M6 drain will push the rows
|
|
/// through the gRPC client. For end-to-end testing today we need a way to
|
|
/// exercise the central dual-write transaction immediately, so this
|
|
/// dispatcher synthesises the wire packet inline and round-trips it through
|
|
/// the stub client. The shape mirrors what the M6 drain will eventually emit.
|
|
/// </para>
|
|
/// <para>
|
|
/// <b>Best-effort:</b> both the inner forwarder call and the wire push are
|
|
/// wrapped in independent try/catch blocks. A thrown wire client doesn't
|
|
/// abort the local writes (the audit row is already in SQLite); a thrown
|
|
/// local forwarder doesn't abort the wire push (central still gets the
|
|
/// dual-write attempt).
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class CombinedTelemetryDispatcher : ICachedCallTelemetryForwarder
|
|
{
|
|
private readonly ICachedCallTelemetryForwarder _inner;
|
|
private readonly ISiteStreamAuditClient _wireClient;
|
|
|
|
public CombinedTelemetryDispatcher(
|
|
ICachedCallTelemetryForwarder inner,
|
|
ISiteStreamAuditClient wireClient)
|
|
{
|
|
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
|
|
_wireClient = wireClient ?? throw new ArgumentNullException(nameof(wireClient));
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(telemetry);
|
|
|
|
// Inner forwarder writes the audit row to SQLite + updates the
|
|
// tracking store. Best-effort — exceptions are already swallowed
|
|
// inside the production forwarder, but wrap defensively here too in
|
|
// case a test substitutes a stricter inner.
|
|
try
|
|
{
|
|
await _inner.ForwardAsync(telemetry, ct).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// Swallow — alog.md §7 best-effort contract.
|
|
}
|
|
|
|
// Project the same packet onto the wire and push it through the stub
|
|
// client. This is the bit a future M6 drain will subsume — until
|
|
// then the test wraps the two halves into one observer-driven step.
|
|
try
|
|
{
|
|
var batch = new CachedTelemetryBatch();
|
|
batch.Packets.Add(BuildPacket(telemetry));
|
|
await _wireClient.IngestCachedTelemetryAsync(batch, ct).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// Swallow — the audit row is still in SQLite for a future drain;
|
|
// the central row will materialise the next time the wire path
|
|
// is exercised (or via the M6 reconciliation pull).
|
|
}
|
|
}
|
|
|
|
private static CachedTelemetryPacket BuildPacket(CachedCallTelemetry telemetry)
|
|
{
|
|
return new CachedTelemetryPacket
|
|
{
|
|
AuditEvent = AuditEventDtoMapper.ToDto(telemetry.Audit),
|
|
Operational = ToOperationalDto(telemetry.Operational),
|
|
};
|
|
}
|
|
|
|
private static SiteCallOperationalDto ToOperationalDto(SiteCallOperational op)
|
|
{
|
|
var dto = new SiteCallOperationalDto
|
|
{
|
|
TrackedOperationId = op.TrackedOperationId.Value.ToString("D"),
|
|
Channel = op.Channel,
|
|
Target = op.Target,
|
|
SourceSite = op.SourceSite,
|
|
Status = op.Status,
|
|
RetryCount = op.RetryCount,
|
|
LastError = op.LastError ?? string.Empty,
|
|
CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.CreatedAtUtc)),
|
|
UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.UpdatedAtUtc)),
|
|
};
|
|
if (op.HttpStatus.HasValue)
|
|
{
|
|
dto.HttpStatus = op.HttpStatus.Value;
|
|
}
|
|
if (op.TerminalAtUtc.HasValue)
|
|
{
|
|
dto.TerminalAtUtc = Timestamp.FromDateTime(EnsureUtc(op.TerminalAtUtc.Value));
|
|
}
|
|
return dto;
|
|
}
|
|
|
|
private static DateTime EnsureUtc(DateTime value) =>
|
|
value.Kind == DateTimeKind.Utc
|
|
? value
|
|
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
|
|
}
|