From d03c2af9a102201268b182b01eff1f9cd5b50d76 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 09:49:43 -0400 Subject: [PATCH] fix(audit): race-safe channel cache + UTC-kind cursor handling in gRPC pull client (review) --- .../Central/GrpcPullAuditEventsClient.cs | 52 +++++++++++++++---- .../Central/GrpcPullAuditEventsClientTests.cs | 49 +++++++++++++++++ 2 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs index f836945b..bad3e88f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs @@ -114,9 +114,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient siteId, endpoint); return Empty; } - catch (OperationCanceledException) when (ct.IsCancellationRequested) + catch (OperationCanceledException) { - // Reconciliation tick was cancelled (host shutdown / scope dispose). + // Reconciliation tick was cancelled — either the caller's token + // (host shutdown / scope dispose) or an internal gRPC deadline / + // linked-CTS cancellation. Both are tolerable for a best-effort + // pull; collapse to empty rather than letting an internal + // cancellation land noisily in the catch-all below. return Empty; } catch (Exception ex) @@ -165,10 +169,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient StatusCode.DeadlineExceeded or StatusCode.Cancelled; + // All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the + // reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is + // therefore treated AS UTC — never ToUniversalTime()-converted: on a host + // with a positive UTC offset MinValue.ToUniversalTime() underflows and + // Timestamp.FromDateTime throws, crashing the first pull for every site. private static DateTime EnsureUtc(DateTime value) => - value.Kind == DateTimeKind.Utc - ? value - : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc); + value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc); /// /// Seam over the PullAuditEvents unary gRPC call against a resolved @@ -195,10 +202,15 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient /// Production : /// caches one per endpoint (keepalive from /// , mirroring SiteStreamGrpcClient) -/// and issues the unary PullAuditEventsAsync call. The cache flushes a -/// stale channel when an endpoint is re-keyed (NodeA→NodeB failover / address -/// edit), the same liveness guarantee SiteStreamGrpcClientFactory gives -/// the streaming client. +/// and issues the unary PullAuditEventsAsync call. The cache is keyed by +/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an +/// edited gRPC address) is reached as soon as the resolver hands the new +/// endpoint to — it creates a fresh channel for the +/// new address. Unlike SiteStreamGrpcClientFactory (keyed by siteId, +/// which actively evicts a re-keyed client), the channel for the previous +/// address is NOT actively evicted here; it lingers idle until +/// . Idle channels hold no streams, so this is a minor +/// cache footprint cost, not a correctness or liveness gap. /// public sealed class GrpcPullAuditEventsInvoker : GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable @@ -228,12 +240,32 @@ public sealed class GrpcPullAuditEventsInvoker public async Task InvokeAsync( string endpoint, ProtoPullRequest request, CancellationToken ct) { - var channel = _channels.GetOrAdd(endpoint, CreateChannel); + var channel = GetOrCreateChannel(endpoint); var client = new SiteStreamService.SiteStreamServiceClient(channel); using var call = client.PullAuditEventsAsync(request, cancellationToken: ct); return await call.ResponseAsync.ConfigureAwait(false); } + // Race-safe channel cache. ConcurrentDictionary.GetOrAdd(key, valueFactory) + // does NOT serialize the factory, so two concurrent first dials of the same + // endpoint can both build a GrpcChannel (each holds an HTTP/2 connection + // pool) and the loser would leak. Create-then-GetOrAdd-then-dispose-if-lost + // mirrors SiteStreamGrpcClientFactory: only the channel actually installed + // survives; a channel that lost the race is disposed immediately. + private GrpcChannel GetOrCreateChannel(string endpoint) + { + if (!_channels.TryGetValue(endpoint, out var channel)) + { + var created = CreateChannel(endpoint); + channel = _channels.GetOrAdd(endpoint, created); + if (!ReferenceEquals(channel, created)) + { + created.Dispose(); + } + } + return channel; + } + private GrpcChannel CreateChannel(string endpoint) => GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions { diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs index c6a42b7d..7664b36c 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs @@ -163,4 +163,53 @@ public class GrpcPullAuditEventsClientTests Assert.Empty(result.Events); Assert.False(result.MoreAvailable); } + + [Fact] + public async Task PullAsync_swallows_unexpected_faults_to_empty_response() + { + // I3(a): the catch-all path. A non-transport fault (e.g. a mapping/ + // protocol error surfacing as InvalidOperationException) must still be + // swallowed to empty — audit reconciliation is best-effort and a throw + // would only get re-caught by the actor's per-site guard. + var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom")); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.Events); + Assert.False(result.MoreAvailable); + } + + [Fact] + public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials() + { + // I3(b) / guards I2: the reconciliation cursor starts at DateTime.MinValue + // with Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide + // "all timestamps are UTC" invariant) and NOT call ToUniversalTime() — on a + // host with a positive UTC offset that underflows and Timestamp.FromDateTime + // throws ArgumentOutOfRangeException, crashing the FIRST pull for every site. + var minUnspecified = default(DateTime); // DateTime.MinValue, Kind=Unspecified + Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind); + + var invoker = FakeInvoker.Returning(new ProtoPullResponse()); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + // MUST NOT throw — must dial successfully. + var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None); + + Assert.Equal(1, invoker.CallCount); + Assert.Equal("http://site-a:8083", invoker.Endpoint); + Assert.NotNull(invoker.Request); + // The unspecified-MinValue cursor is carried through verbatim as UTC + // MinValue (no local-TZ conversion). + Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime()); + Assert.Empty(result.Events); + Assert.False(result.MoreAvailable); + } }