diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs
index f836945b..bad3e88f 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs
@@ -114,9 +114,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
siteId, endpoint);
return Empty;
}
- catch (OperationCanceledException) when (ct.IsCancellationRequested)
+ catch (OperationCanceledException)
{
- // Reconciliation tick was cancelled (host shutdown / scope dispose).
+ // Reconciliation tick was cancelled — either the caller's token
+ // (host shutdown / scope dispose) or an internal gRPC deadline /
+ // linked-CTS cancellation. Both are tolerable for a best-effort
+ // pull; collapse to empty rather than letting an internal
+ // cancellation land noisily in the catch-all below.
return Empty;
}
catch (Exception ex)
@@ -165,10 +169,13 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
StatusCode.DeadlineExceeded or
StatusCode.Cancelled;
+ // All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the
+ // reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is
+ // therefore treated AS UTC — never ToUniversalTime()-converted: on a host
+ // with a positive UTC offset MinValue.ToUniversalTime() underflows and
+ // Timestamp.FromDateTime throws, crashing the first pull for every site.
private static DateTime EnsureUtc(DateTime value) =>
- value.Kind == DateTimeKind.Utc
- ? value
- : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
+ value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
///
/// Seam over the PullAuditEvents unary gRPC call against a resolved
@@ -195,10 +202,15 @@ public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
/// Production :
/// caches one per endpoint (keepalive from
/// , mirroring SiteStreamGrpcClient)
-/// and issues the unary PullAuditEventsAsync call. The cache flushes a
-/// stale channel when an endpoint is re-keyed (NodeA→NodeB failover / address
-/// edit), the same liveness guarantee SiteStreamGrpcClientFactory gives
-/// the streaming client.
+/// and issues the unary PullAuditEventsAsync call. The cache is keyed by
+/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an
+/// edited gRPC address) is reached as soon as the resolver hands the new
+/// endpoint to — it creates a fresh channel for the
+/// new address. Unlike SiteStreamGrpcClientFactory (keyed by siteId,
+/// which actively evicts a re-keyed client), the channel for the previous
+/// address is NOT actively evicted here; it lingers idle until
+/// . Idle channels hold no streams, so this is a minor
+/// cache footprint cost, not a correctness or liveness gap.
///
public sealed class GrpcPullAuditEventsInvoker
: GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable
@@ -228,12 +240,32 @@ public sealed class GrpcPullAuditEventsInvoker
public async Task InvokeAsync(
string endpoint, ProtoPullRequest request, CancellationToken ct)
{
- var channel = _channels.GetOrAdd(endpoint, CreateChannel);
+ var channel = GetOrCreateChannel(endpoint);
var client = new SiteStreamService.SiteStreamServiceClient(channel);
using var call = client.PullAuditEventsAsync(request, cancellationToken: ct);
return await call.ResponseAsync.ConfigureAwait(false);
}
+ // Race-safe channel cache. ConcurrentDictionary.GetOrAdd(key, valueFactory)
+ // does NOT serialize the factory, so two concurrent first dials of the same
+ // endpoint can both build a GrpcChannel (each holds an HTTP/2 connection
+ // pool) and the loser would leak. Create-then-GetOrAdd-then-dispose-if-lost
+ // mirrors SiteStreamGrpcClientFactory: only the channel actually installed
+ // survives; a channel that lost the race is disposed immediately.
+ private GrpcChannel GetOrCreateChannel(string endpoint)
+ {
+ if (!_channels.TryGetValue(endpoint, out var channel))
+ {
+ var created = CreateChannel(endpoint);
+ channel = _channels.GetOrAdd(endpoint, created);
+ if (!ReferenceEquals(channel, created))
+ {
+ created.Dispose();
+ }
+ }
+ return channel;
+ }
+
private GrpcChannel CreateChannel(string endpoint) =>
GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs
index c6a42b7d..7664b36c 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs
@@ -163,4 +163,53 @@ public class GrpcPullAuditEventsClientTests
Assert.Empty(result.Events);
Assert.False(result.MoreAvailable);
}
+
+ [Fact]
+ public async Task PullAsync_swallows_unexpected_faults_to_empty_response()
+ {
+ // I3(a): the catch-all path. A non-transport fault (e.g. a mapping/
+ // protocol error surfacing as InvalidOperationException) must still be
+ // swallowed to empty — audit reconciliation is best-effort and a throw
+ // would only get re-caught by the actor's per-site guard.
+ var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom"));
+ var sut = new GrpcPullAuditEventsClient(
+ new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")),
+ invoker,
+ NullLogger.Instance);
+
+ var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None);
+
+ Assert.Empty(result.Events);
+ Assert.False(result.MoreAvailable);
+ }
+
+ [Fact]
+ public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials()
+ {
+ // I3(b) / guards I2: the reconciliation cursor starts at DateTime.MinValue
+ // with Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide
+ // "all timestamps are UTC" invariant) and NOT call ToUniversalTime() — on a
+ // host with a positive UTC offset that underflows and Timestamp.FromDateTime
+ // throws ArgumentOutOfRangeException, crashing the FIRST pull for every site.
+ var minUnspecified = default(DateTime); // DateTime.MinValue, Kind=Unspecified
+ Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind);
+
+ var invoker = FakeInvoker.Returning(new ProtoPullResponse());
+ var sut = new GrpcPullAuditEventsClient(
+ new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")),
+ invoker,
+ NullLogger.Instance);
+
+ // MUST NOT throw — must dial successfully.
+ var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None);
+
+ Assert.Equal(1, invoker.CallCount);
+ Assert.Equal("http://site-a:8083", invoker.Endpoint);
+ Assert.NotNull(invoker.Request);
+ // The unspecified-MinValue cursor is carried through verbatim as UTC
+ // MinValue (no local-TZ conversion).
+ Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime());
+ Assert.Empty(result.Events);
+ Assert.False(result.MoreAvailable);
+ }
}