diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs index b483b1a5..350ee1ac 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs @@ -41,8 +41,10 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central; /// , , /// bare / SocketException) are caught /// and collapsed to an empty response so one offline site never sinks the rest -/// of the reconciliation tick. Any other fault (e.g. a malformed reply that -/// fails DTO mapping) is also swallowed to empty: reconciliation is best-effort. +/// of the reconciliation tick. Any other transport/protocol fault is also +/// swallowed to empty: reconciliation is best-effort. Per-row DTO mapping faults +/// (e.g. a single unparseable TrackedOperationId) are narrower still — +/// the offending row is skipped+logged and the rest of the batch is returned. /// /// /// Testability. The unary call is reached through the @@ -138,15 +140,30 @@ public sealed class GrpcPullSiteCallsClient : IPullSiteCallsClient return Empty; } - // Map proto DTOs to central SiteCall entities, re-stamp SourceSite from - // the dialed siteId (the site leaves it empty), and order oldest-first by - // UpdatedAtUtc (the wire is already ordered by the site read, but the - // contract is explicit, so sort defensively). - var siteCalls = reply.Operationals - .Select(SiteCallDtoMapper.FromDto) - .Select(sc => sc with { SourceSite = siteId }) - .OrderBy(sc => sc.UpdatedAtUtc) - .ToList(); + // Map proto DTOs to central SiteCall entities PER-ROW so one malformed + // operational (e.g. an unparseable TrackedOperationId) is skipped+logged + // rather than sinking the whole batch through the outer catch-all. Each + // survivor is re-stamped with SourceSite from the dialed siteId (the site + // leaves it empty). + var siteCalls = new List(reply.Operationals.Count); + foreach (var dto in reply.Operationals) + { + try + { + var sc = SiteCallDtoMapper.FromDto(dto) with { SourceSite = siteId }; + siteCalls.Add(sc); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "PullSiteCalls dropped a malformed operational row from site {SiteId} (id='{Id}'); continuing with the rest of the batch.", + siteId, dto.TrackedOperationId); + } + } + + // Order oldest-first by UpdatedAtUtc (the wire is already ordered by the + // site read, but the contract is explicit, so sort defensively). + siteCalls.Sort((a, b) => a.UpdatedAtUtc.CompareTo(b.UpdatedAtUtc)); return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable); } diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs index f9d9aec0..7aedd140 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs @@ -457,7 +457,9 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase // sinceUtc defaults to DateTime.MinValue when the wrapper is absent — // i.e. "pull from the beginning of recorded history", which is the // intended behaviour for the very first reconciliation cycle. - var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue; + var since = request.SinceUtc is not null + ? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc) + : DateTime.MinValue; IReadOnlyList events; try @@ -537,10 +539,10 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase // since_utc defaults to DateTime.MinValue when the wrapper is absent — // i.e. "pull from the beginning of recorded history", the intended - // behaviour for the very first reconciliation cycle. ToUniversalTime - // is safe here (the wire value is always a real UTC Timestamp, never the - // unspecified-MinValue the central client guards against on its side). - var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue; + // behaviour for the very first reconciliation cycle. + var since = request.SinceUtc is not null + ? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc) + : DateTime.MinValue; IReadOnlyList operationals; try diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs index b3e48c54..c48964b9 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs @@ -91,6 +91,8 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, ); CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking (Status, UpdatedAtUtc); + CREATE INDEX IF NOT EXISTS IX_OperationTracking_UpdatedAt + ON OperationTracking (UpdatedAtUtc); """; cmd.ExecuteNonQuery(); @@ -370,8 +372,10 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, // SiteRuntime-024: like GetStatusAsync, the reconciliation pull opens a // fresh, ungated read connection so a long-running write never blocks - // central's PullSiteCalls. The query is a bounded, ordered scan over the - // (Status, UpdatedAtUtc) index range — UpdatedAtUtc is the cursor. + // central's PullSiteCalls. The query is a bounded, ordered scan served by + // the standalone IX_OperationTracking_UpdatedAt index — UpdatedAtUtc is + // the cursor. (The composite (Status, UpdatedAtUtc) index cannot satisfy a + // status-less UpdatedAtUtc range scan; this dedicated index does.) await using var readConnection = new SqliteConnection(_connectionString); await readConnection.OpenAsync(ct).ConfigureAwait(false); @@ -390,9 +394,15 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, ORDER BY UpdatedAtUtc ASC LIMIT $batchSize; """; - cmd.Parameters.AddWithValue( - "$since", - sinceUtc.ToString("o", CultureInfo.InvariantCulture)); + // Force UTC kind before formatting so the cursor's "o" text matches the + // 'Z'-suffixed round-trip form the write path persists (DateTime.UtcNow + // .ToString("o")). A first-cycle DateTime.MinValue arrives Unspecified — + // without this its "o" rendering would lack the 'Z', and the SQLite text + // compare against 'Z'-suffixed stored values would be subtly inconsistent. + var sinceText = DateTime + .SpecifyKind(sinceUtc, DateTimeKind.Utc) + .ToString("o", CultureInfo.InvariantCulture); + cmd.Parameters.AddWithValue("$since", sinceText); cmd.Parameters.AddWithValue("$batchSize", batchSize); var rows = new List(); diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs index 982a9923..650b4a15 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs @@ -186,6 +186,42 @@ public class GrpcPullSiteCallsClientTests Assert.False(result.MoreAvailable); } + [Fact] + public async Task PullAsync_skips_poison_row_and_returns_the_good_rows() + { + // Poison-row resilience: one malformed operational (an unparseable + // TrackedOperationId fails SiteCallDtoMapper.FromDto → Guid.Parse) must be + // skipped+logged PER ROW rather than sinking the whole batch through the + // outer catch-all. The two good rows survive, re-stamped + oldest-first. + var older = Guid.NewGuid(); + var newer = Guid.NewGuid(); + + var proto = new ProtoPullResponse { MoreAvailable = false }; + proto.Operationals.Add(Dto(newer, BaseTime.AddMinutes(5))); + // Malformed row in the middle of the batch. + var bad = Dto(Guid.NewGuid(), BaseTime.AddMinutes(2)); + bad.TrackedOperationId = "not-a-guid"; + proto.Operationals.Add(bad); + proto.Operationals.Add(Dto(older, BaseTime)); + + var invoker = FakeInvoker.Returning(proto); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + // Must NOT throw — the bad row is dropped, the good rows are returned. + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Equal(2, result.SiteCalls.Count); + // Survivors are oldest-first and SourceSite re-stamped from the dialed siteId. + Assert.Equal(older, result.SiteCalls[0].TrackedOperationId.Value); + Assert.Equal(newer, result.SiteCalls[1].TrackedOperationId.Value); + Assert.Equal("site-a", result.SiteCalls[0].SourceSite); + Assert.Equal("site-a", result.SiteCalls[1].SourceSite); + Assert.False(result.MoreAvailable); + } + [Fact] public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials() { diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs index e6b2bb99..45104a84 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs @@ -121,6 +121,38 @@ public class SiteStreamPullSiteCallsTests : TestKit Assert.Equal(since, capturedSince); } + [Fact] + public async Task PullSiteCalls_SinceUtcUnset_PassesDateTimeMinValue() + { + // First reconciliation cycle: central has no cursor yet, so the request's + // SinceUtc wrapper is absent (null). The handler must default to + // DateTime.MinValue ("pull from the beginning of recorded history") + // without a null-deref — this proves the very first cycle doesn't crash. + var store = Substitute.For(); + var captured = new DateTime(2099, 1, 1, 0, 0, 0, DateTimeKind.Utc); // sentinel + store.ReadChangedSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(call => + { + captured = call.ArgAt(0); + return (IReadOnlyList)Array.Empty(); + }); + + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + // SinceUtc intentionally left unset (null) — the proto wrapper is absent. + var request = new PullSiteCallsRequest + { + BatchSize = 100, + }; + + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Empty(response.Operationals); + Assert.False(response.MoreAvailable); + Assert.Equal(DateTime.MinValue, captured); + } + [Fact] public async Task PullSiteCalls_BatchSize3_Returns3Rows_MoreAvailableTrue() {