fix(audit): race-safe channel cache + UTC-kind cursor handling in gRPC pull client (review)
This commit is contained in:
@@ -114,9 +114,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
|
||||
siteId, endpoint);
|
||||
return Empty;
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Reconciliation tick was cancelled (host shutdown / scope dispose).
|
||||
// Reconciliation tick was cancelled — either the caller's token
|
||||
// (host shutdown / scope dispose) or an internal gRPC deadline /
|
||||
// linked-CTS cancellation. Both are tolerable for a best-effort
|
||||
// pull; collapse to empty rather than letting an internal
|
||||
// cancellation land noisily in the catch-all below.
|
||||
return Empty;
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -165,10 +169,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
|
||||
StatusCode.DeadlineExceeded or
|
||||
StatusCode.Cancelled;
|
||||
|
||||
// All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the
|
||||
// reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is
|
||||
// therefore treated AS UTC — never ToUniversalTime()-converted: on a host
|
||||
// with a positive UTC offset MinValue.ToUniversalTime() underflows and
|
||||
// Timestamp.FromDateTime throws, crashing the first pull for every site.
|
||||
private static DateTime EnsureUtc(DateTime value) =>
|
||||
value.Kind == DateTimeKind.Utc
|
||||
? value
|
||||
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
|
||||
value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
|
||||
|
||||
/// <summary>
|
||||
/// Seam over the <c>PullAuditEvents</c> unary gRPC call against a resolved
|
||||
@@ -195,10 +202,15 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
|
||||
/// Production <see cref="GrpcPullAuditEventsClient.IPullAuditEventsInvoker"/>:
|
||||
/// caches one <see cref="GrpcChannel"/> per endpoint (keepalive from
|
||||
/// <see cref="CommunicationOptions"/>, mirroring <c>SiteStreamGrpcClient</c>)
|
||||
/// and issues the unary <c>PullAuditEventsAsync</c> call. The cache flushes a
|
||||
/// stale channel when an endpoint is re-keyed (NodeA→NodeB failover / address
|
||||
/// edit), the same liveness guarantee <c>SiteStreamGrpcClientFactory</c> gives
|
||||
/// the streaming client.
|
||||
/// and issues the unary <c>PullAuditEventsAsync</c> call. The cache is keyed by
|
||||
/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an
|
||||
/// edited gRPC address) is reached as soon as the resolver hands the new
|
||||
/// endpoint to <see cref="InvokeAsync"/> — it creates a fresh channel for the
|
||||
/// new address. Unlike <c>SiteStreamGrpcClientFactory</c> (keyed by siteId,
|
||||
/// which actively evicts a re-keyed client), the channel for the previous
|
||||
/// address is NOT actively evicted here; it lingers idle until
|
||||
/// <see cref="Dispose"/>. Idle channels hold no streams, so this is a minor
|
||||
/// cache footprint cost, not a correctness or liveness gap.
|
||||
/// </summary>
|
||||
public sealed class GrpcPullAuditEventsInvoker
|
||||
: GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable
|
||||
@@ -228,12 +240,32 @@ public sealed class GrpcPullAuditEventsInvoker
|
||||
public async Task<ProtoPullResponse> InvokeAsync(
|
||||
string endpoint, ProtoPullRequest request, CancellationToken ct)
|
||||
{
|
||||
var channel = _channels.GetOrAdd(endpoint, CreateChannel);
|
||||
var channel = GetOrCreateChannel(endpoint);
|
||||
var client = new SiteStreamService.SiteStreamServiceClient(channel);
|
||||
using var call = client.PullAuditEventsAsync(request, cancellationToken: ct);
|
||||
return await call.ResponseAsync.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Race-safe channel cache. ConcurrentDictionary.GetOrAdd(key, valueFactory)
|
||||
// does NOT serialize the factory, so two concurrent first dials of the same
|
||||
// endpoint can both build a GrpcChannel (each holds an HTTP/2 connection
|
||||
// pool) and the loser would leak. Create-then-GetOrAdd-then-dispose-if-lost
|
||||
// mirrors SiteStreamGrpcClientFactory: only the channel actually installed
|
||||
// survives; a channel that lost the race is disposed immediately.
|
||||
private GrpcChannel GetOrCreateChannel(string endpoint)
|
||||
{
|
||||
if (!_channels.TryGetValue(endpoint, out var channel))
|
||||
{
|
||||
var created = CreateChannel(endpoint);
|
||||
channel = _channels.GetOrAdd(endpoint, created);
|
||||
if (!ReferenceEquals(channel, created))
|
||||
{
|
||||
created.Dispose();
|
||||
}
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
private GrpcChannel CreateChannel(string endpoint) =>
|
||||
GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
|
||||
{
|
||||
|
||||
@@ -163,4 +163,53 @@ public class GrpcPullAuditEventsClientTests
|
||||
Assert.Empty(result.Events);
|
||||
Assert.False(result.MoreAvailable);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PullAsync_swallows_unexpected_faults_to_empty_response()
|
||||
{
|
||||
// I3(a): the catch-all path. A non-transport fault (e.g. a mapping/
|
||||
// protocol error surfacing as InvalidOperationException) must still be
|
||||
// swallowed to empty — audit reconciliation is best-effort and a throw
|
||||
// would only get re-caught by the actor's per-site guard.
|
||||
var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom"));
|
||||
var sut = new GrpcPullAuditEventsClient(
|
||||
new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")),
|
||||
invoker,
|
||||
NullLogger<GrpcPullAuditEventsClient>.Instance);
|
||||
|
||||
var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None);
|
||||
|
||||
Assert.Empty(result.Events);
|
||||
Assert.False(result.MoreAvailable);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials()
|
||||
{
|
||||
// I3(b) / guards I2: the reconciliation cursor starts at DateTime.MinValue
|
||||
// with Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide
|
||||
// "all timestamps are UTC" invariant) and NOT call ToUniversalTime() — on a
|
||||
// host with a positive UTC offset that underflows and Timestamp.FromDateTime
|
||||
// throws ArgumentOutOfRangeException, crashing the FIRST pull for every site.
|
||||
var minUnspecified = default(DateTime); // DateTime.MinValue, Kind=Unspecified
|
||||
Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind);
|
||||
|
||||
var invoker = FakeInvoker.Returning(new ProtoPullResponse());
|
||||
var sut = new GrpcPullAuditEventsClient(
|
||||
new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")),
|
||||
invoker,
|
||||
NullLogger<GrpcPullAuditEventsClient>.Instance);
|
||||
|
||||
// MUST NOT throw — must dial successfully.
|
||||
var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, invoker.CallCount);
|
||||
Assert.Equal("http://site-a:8083", invoker.Endpoint);
|
||||
Assert.NotNull(invoker.Request);
|
||||
// The unspecified-MinValue cursor is carried through verbatim as UTC
|
||||
// MinValue (no local-TZ conversion).
|
||||
Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime());
|
||||
Assert.Empty(result.Events);
|
||||
Assert.False(result.MoreAvailable);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user