feat(auditlog): extend ISiteStreamAuditClient with IngestCachedTelemetryAsync (#23 M3)
Add the second site→central RPC seam alongside the existing M2 IngestAuditEventsAsync. The Bundle D proto already lit up IngestCachedTelemetry (CachedTelemetryBatch / IngestAck) so this commit just plumbs the client-side abstraction: * ISiteStreamAuditClient gains IngestCachedTelemetryAsync(batch, ct). * NoOpSiteStreamAuditClient implements it returning an empty IngestAck (same shape as M2 — production gRPC client lands in M6). * SyncCallEmissionEndToEndTests' DirectActorSiteStreamAuditClient stub throws NotSupportedException from the new method so a regression that accidentally routes a cached packet through the sync stub fails loudly. * New NoOpSiteStreamAuditClientTests cover the null-guard + empty-ack contract for both batch shapes. Bundle E task E1.
This commit is contained in:
@@ -20,4 +20,23 @@ public interface ISiteStreamAuditClient
|
|||||||
/// in the site SQLite queue.
|
/// in the site SQLite queue.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct);
|
Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pushes the combined <see cref="CachedTelemetryBatch"/> (Audit Log #23 / M3)
|
||||||
|
/// to the central <c>IngestCachedTelemetry</c> RPC. Each packet carries both
|
||||||
|
/// the audit row and the operational <c>SiteCalls</c> upsert; central writes
|
||||||
|
/// both in a single MS SQL transaction. Returns the same
|
||||||
|
/// <see cref="IngestAck"/> shape as <see cref="IngestAuditEventsAsync"/> so
|
||||||
|
/// the M3 site-side forwarder can flip the underlying audit rows to
|
||||||
|
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>
|
||||||
|
/// once central has acknowledged them.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// The production gRPC-backed implementation lands in M6 (no site→central
|
||||||
|
/// gRPC channel exists today); until then the default
|
||||||
|
/// <see cref="NoOpSiteStreamAuditClient"/> binding returns an empty ack and
|
||||||
|
/// integration tests substitute a direct-actor client that routes the batch
|
||||||
|
/// straight into the in-process <c>AuditLogIngestActor</c>.
|
||||||
|
/// </remarks>
|
||||||
|
Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,4 +38,16 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
|
|||||||
// Pending until M6's real client (or a Bundle H test stub) takes over.
|
// Pending until M6's real client (or a Bundle H test stub) takes over.
|
||||||
return Task.FromResult(EmptyAck);
|
return Task.FromResult(EmptyAck);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(batch);
|
||||||
|
// Empty ack — same rationale as IngestAuditEventsAsync. The M3
|
||||||
|
// CachedCallTelemetryForwarder still writes the audit + tracking rows to
|
||||||
|
// the site SQLite stores authoritatively; central-side state only
|
||||||
|
// materialises once M6's real gRPC client (or a Bundle G test stub) is
|
||||||
|
// wired in.
|
||||||
|
return Task.FromResult(EmptyAck);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -337,5 +337,17 @@ public class SyncCallEmissionEndToEndTests : TestKit, IClassFixture<MsSqlMigrati
|
|||||||
}
|
}
|
||||||
return ack;
|
return ack;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Bundle E E1: the sync-only end-to-end suite does not exercise the
|
||||||
|
/// cached-telemetry path. Throw if it is ever called from these tests
|
||||||
|
/// so a regression that accidentally routes a cached packet through
|
||||||
|
/// the sync stub fails loudly rather than silently no-op'ing.
|
||||||
|
/// </summary>
|
||||||
|
public Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
throw new NotSupportedException(
|
||||||
|
"Sync-call test stub does not implement cached telemetry — use the M3 cached-call client.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,58 @@
|
|||||||
|
using ScadaLink.AuditLog.Site.Telemetry;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Bundle E E1 tests for <see cref="NoOpSiteStreamAuditClient"/>. The NoOp
|
||||||
|
/// client is the default <see cref="ISiteStreamAuditClient"/> binding until M6
|
||||||
|
/// delivers the gRPC-backed implementation; both <c>IngestAuditEventsAsync</c>
|
||||||
|
/// (M2) and <c>IngestCachedTelemetryAsync</c> (M3) must return an empty ack
|
||||||
|
/// (no rows flipped to Forwarded) without throwing or partially handling the
|
||||||
|
/// batch.
|
||||||
|
/// </summary>
|
||||||
|
public class NoOpSiteStreamAuditClientTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestCachedTelemetryAsync_EmptyBatch_ReturnsEmptyAck()
|
||||||
|
{
|
||||||
|
var sut = new NoOpSiteStreamAuditClient();
|
||||||
|
var batch = new CachedTelemetryBatch();
|
||||||
|
|
||||||
|
var ack = await sut.IngestCachedTelemetryAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.NotNull(ack);
|
||||||
|
Assert.Empty(ack.AcceptedEventIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestCachedTelemetryAsync_PopulatedBatch_ReturnsEmptyAck()
|
||||||
|
{
|
||||||
|
var sut = new NoOpSiteStreamAuditClient();
|
||||||
|
var batch = new CachedTelemetryBatch();
|
||||||
|
batch.Packets.Add(new CachedTelemetryPacket
|
||||||
|
{
|
||||||
|
AuditEvent = new AuditEventDto
|
||||||
|
{
|
||||||
|
EventId = Guid.NewGuid().ToString(),
|
||||||
|
Channel = "ApiOutbound",
|
||||||
|
Kind = "CachedSubmit",
|
||||||
|
Status = "Submitted",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
var ack = await sut.IngestCachedTelemetryAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
// No EventIds flipped — NoOp does not forward to anyone.
|
||||||
|
Assert.Empty(ack.AcceptedEventIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestCachedTelemetryAsync_NullBatch_Throws()
|
||||||
|
{
|
||||||
|
var sut = new NoOpSiteStreamAuditClient();
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||||
|
() => sut.IngestCachedTelemetryAsync(null!, CancellationToken.None));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user