fix(sitecallaudit): UpdatedAtUtc index + per-row pull resilience + UTC-convention + first-cycle test (review)
This commit is contained in:
@@ -41,8 +41,10 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
|||||||
/// <see cref="StatusCode.DeadlineExceeded"/>, <see cref="StatusCode.Cancelled"/>,
|
/// <see cref="StatusCode.DeadlineExceeded"/>, <see cref="StatusCode.Cancelled"/>,
|
||||||
/// bare <see cref="HttpRequestException"/> / <c>SocketException</c>) are caught
|
/// bare <see cref="HttpRequestException"/> / <c>SocketException</c>) are caught
|
||||||
/// and collapsed to an empty response so one offline site never sinks the rest
|
/// and collapsed to an empty response so one offline site never sinks the rest
|
||||||
/// of the reconciliation tick. Any other fault (e.g. a malformed reply that
|
/// of the reconciliation tick. Any other transport/protocol fault is also
|
||||||
/// fails DTO mapping) is also swallowed to empty: reconciliation is best-effort.
|
/// swallowed to empty: reconciliation is best-effort. Per-row DTO mapping faults
|
||||||
|
/// (e.g. a single unparseable <c>TrackedOperationId</c>) are narrower still —
|
||||||
|
/// the offending row is skipped+logged and the rest of the batch is returned.
|
||||||
/// </para>
|
/// </para>
|
||||||
/// <para>
|
/// <para>
|
||||||
/// <b>Testability.</b> The unary call is reached through the
|
/// <b>Testability.</b> The unary call is reached through the
|
||||||
@@ -138,15 +140,30 @@ public sealed class GrpcPullSiteCallsClient : IPullSiteCallsClient
|
|||||||
return Empty;
|
return Empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map proto DTOs to central SiteCall entities, re-stamp SourceSite from
|
// Map proto DTOs to central SiteCall entities PER-ROW so one malformed
|
||||||
// the dialed siteId (the site leaves it empty), and order oldest-first by
|
// operational (e.g. an unparseable TrackedOperationId) is skipped+logged
|
||||||
// UpdatedAtUtc (the wire is already ordered by the site read, but the
|
// rather than sinking the whole batch through the outer catch-all. Each
|
||||||
// contract is explicit, so sort defensively).
|
// survivor is re-stamped with SourceSite from the dialed siteId (the site
|
||||||
var siteCalls = reply.Operationals
|
// leaves it empty).
|
||||||
.Select(SiteCallDtoMapper.FromDto)
|
var siteCalls = new List<SiteCall>(reply.Operationals.Count);
|
||||||
.Select(sc => sc with { SourceSite = siteId })
|
foreach (var dto in reply.Operationals)
|
||||||
.OrderBy(sc => sc.UpdatedAtUtc)
|
{
|
||||||
.ToList();
|
try
|
||||||
|
{
|
||||||
|
var sc = SiteCallDtoMapper.FromDto(dto) with { SourceSite = siteId };
|
||||||
|
siteCalls.Add(sc);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"PullSiteCalls dropped a malformed operational row from site {SiteId} (id='{Id}'); continuing with the rest of the batch.",
|
||||||
|
siteId, dto.TrackedOperationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Order oldest-first by UpdatedAtUtc (the wire is already ordered by the
|
||||||
|
// site read, but the contract is explicit, so sort defensively).
|
||||||
|
siteCalls.Sort((a, b) => a.UpdatedAtUtc.CompareTo(b.UpdatedAtUtc));
|
||||||
|
|
||||||
return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable);
|
return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -457,7 +457,9 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
|||||||
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
|
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
|
||||||
// i.e. "pull from the beginning of recorded history", which is the
|
// i.e. "pull from the beginning of recorded history", which is the
|
||||||
// intended behaviour for the very first reconciliation cycle.
|
// intended behaviour for the very first reconciliation cycle.
|
||||||
var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
|
var since = request.SinceUtc is not null
|
||||||
|
? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
|
||||||
|
: DateTime.MinValue;
|
||||||
|
|
||||||
IReadOnlyList<AuditEvent> events;
|
IReadOnlyList<AuditEvent> events;
|
||||||
try
|
try
|
||||||
@@ -537,10 +539,10 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
|||||||
|
|
||||||
// since_utc defaults to DateTime.MinValue when the wrapper is absent —
|
// since_utc defaults to DateTime.MinValue when the wrapper is absent —
|
||||||
// i.e. "pull from the beginning of recorded history", the intended
|
// i.e. "pull from the beginning of recorded history", the intended
|
||||||
// behaviour for the very first reconciliation cycle. ToUniversalTime
|
// behaviour for the very first reconciliation cycle.
|
||||||
// is safe here (the wire value is always a real UTC Timestamp, never the
|
var since = request.SinceUtc is not null
|
||||||
// unspecified-MinValue the central client guards against on its side).
|
? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
|
||||||
var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
|
: DateTime.MinValue;
|
||||||
|
|
||||||
IReadOnlyList<SiteCallOperational> operationals;
|
IReadOnlyList<SiteCallOperational> operationals;
|
||||||
try
|
try
|
||||||
|
|||||||
@@ -91,6 +91,8 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
|
|||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
|
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
|
||||||
ON OperationTracking (Status, UpdatedAtUtc);
|
ON OperationTracking (Status, UpdatedAtUtc);
|
||||||
|
CREATE INDEX IF NOT EXISTS IX_OperationTracking_UpdatedAt
|
||||||
|
ON OperationTracking (UpdatedAtUtc);
|
||||||
""";
|
""";
|
||||||
cmd.ExecuteNonQuery();
|
cmd.ExecuteNonQuery();
|
||||||
|
|
||||||
@@ -370,8 +372,10 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
|
|||||||
|
|
||||||
// SiteRuntime-024: like GetStatusAsync, the reconciliation pull opens a
|
// SiteRuntime-024: like GetStatusAsync, the reconciliation pull opens a
|
||||||
// fresh, ungated read connection so a long-running write never blocks
|
// fresh, ungated read connection so a long-running write never blocks
|
||||||
// central's PullSiteCalls. The query is a bounded, ordered scan over the
|
// central's PullSiteCalls. The query is a bounded, ordered scan served by
|
||||||
// (Status, UpdatedAtUtc) index range — UpdatedAtUtc is the cursor.
|
// the standalone IX_OperationTracking_UpdatedAt index — UpdatedAtUtc is
|
||||||
|
// the cursor. (The composite (Status, UpdatedAtUtc) index cannot satisfy a
|
||||||
|
// status-less UpdatedAtUtc range scan; this dedicated index does.)
|
||||||
await using var readConnection = new SqliteConnection(_connectionString);
|
await using var readConnection = new SqliteConnection(_connectionString);
|
||||||
await readConnection.OpenAsync(ct).ConfigureAwait(false);
|
await readConnection.OpenAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
@@ -390,9 +394,15 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
|
|||||||
ORDER BY UpdatedAtUtc ASC
|
ORDER BY UpdatedAtUtc ASC
|
||||||
LIMIT $batchSize;
|
LIMIT $batchSize;
|
||||||
""";
|
""";
|
||||||
cmd.Parameters.AddWithValue(
|
// Force UTC kind before formatting so the cursor's "o" text matches the
|
||||||
"$since",
|
// 'Z'-suffixed round-trip form the write path persists (DateTime.UtcNow
|
||||||
sinceUtc.ToString("o", CultureInfo.InvariantCulture));
|
// .ToString("o")). A first-cycle DateTime.MinValue arrives Unspecified —
|
||||||
|
// without this its "o" rendering would lack the 'Z', and the SQLite text
|
||||||
|
// compare against 'Z'-suffixed stored values would be subtly inconsistent.
|
||||||
|
var sinceText = DateTime
|
||||||
|
.SpecifyKind(sinceUtc, DateTimeKind.Utc)
|
||||||
|
.ToString("o", CultureInfo.InvariantCulture);
|
||||||
|
cmd.Parameters.AddWithValue("$since", sinceText);
|
||||||
cmd.Parameters.AddWithValue("$batchSize", batchSize);
|
cmd.Parameters.AddWithValue("$batchSize", batchSize);
|
||||||
|
|
||||||
var rows = new List<SiteCallOperational>();
|
var rows = new List<SiteCallOperational>();
|
||||||
|
|||||||
@@ -186,6 +186,42 @@ public class GrpcPullSiteCallsClientTests
|
|||||||
Assert.False(result.MoreAvailable);
|
Assert.False(result.MoreAvailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PullAsync_skips_poison_row_and_returns_the_good_rows()
|
||||||
|
{
|
||||||
|
// Poison-row resilience: one malformed operational (an unparseable
|
||||||
|
// TrackedOperationId fails SiteCallDtoMapper.FromDto → Guid.Parse) must be
|
||||||
|
// skipped+logged PER ROW rather than sinking the whole batch through the
|
||||||
|
// outer catch-all. The two good rows survive, re-stamped + oldest-first.
|
||||||
|
var older = Guid.NewGuid();
|
||||||
|
var newer = Guid.NewGuid();
|
||||||
|
|
||||||
|
var proto = new ProtoPullResponse { MoreAvailable = false };
|
||||||
|
proto.Operationals.Add(Dto(newer, BaseTime.AddMinutes(5)));
|
||||||
|
// Malformed row in the middle of the batch.
|
||||||
|
var bad = Dto(Guid.NewGuid(), BaseTime.AddMinutes(2));
|
||||||
|
bad.TrackedOperationId = "not-a-guid";
|
||||||
|
proto.Operationals.Add(bad);
|
||||||
|
proto.Operationals.Add(Dto(older, BaseTime));
|
||||||
|
|
||||||
|
var invoker = FakeInvoker.Returning(proto);
|
||||||
|
var sut = new GrpcPullSiteCallsClient(
|
||||||
|
new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")),
|
||||||
|
invoker,
|
||||||
|
NullLogger<GrpcPullSiteCallsClient>.Instance);
|
||||||
|
|
||||||
|
// Must NOT throw — the bad row is dropped, the good rows are returned.
|
||||||
|
var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(2, result.SiteCalls.Count);
|
||||||
|
// Survivors are oldest-first and SourceSite re-stamped from the dialed siteId.
|
||||||
|
Assert.Equal(older, result.SiteCalls[0].TrackedOperationId.Value);
|
||||||
|
Assert.Equal(newer, result.SiteCalls[1].TrackedOperationId.Value);
|
||||||
|
Assert.Equal("site-a", result.SiteCalls[0].SourceSite);
|
||||||
|
Assert.Equal("site-a", result.SiteCalls[1].SourceSite);
|
||||||
|
Assert.False(result.MoreAvailable);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials()
|
public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -121,6 +121,38 @@ public class SiteStreamPullSiteCallsTests : TestKit
|
|||||||
Assert.Equal(since, capturedSince);
|
Assert.Equal(since, capturedSince);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PullSiteCalls_SinceUtcUnset_PassesDateTimeMinValue()
|
||||||
|
{
|
||||||
|
// First reconciliation cycle: central has no cursor yet, so the request's
|
||||||
|
// SinceUtc wrapper is absent (null). The handler must default to
|
||||||
|
// DateTime.MinValue ("pull from the beginning of recorded history")
|
||||||
|
// without a null-deref — this proves the very first cycle doesn't crash.
|
||||||
|
var store = Substitute.For<IOperationTrackingStore>();
|
||||||
|
var captured = new DateTime(2099, 1, 1, 0, 0, 0, DateTimeKind.Utc); // sentinel
|
||||||
|
store.ReadChangedSinceAsync(Arg.Any<DateTime>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(call =>
|
||||||
|
{
|
||||||
|
captured = call.ArgAt<DateTime>(0);
|
||||||
|
return (IReadOnlyList<SiteCallOperational>)Array.Empty<SiteCallOperational>();
|
||||||
|
});
|
||||||
|
|
||||||
|
var server = CreateServer();
|
||||||
|
server.SetOperationTrackingStore(store);
|
||||||
|
|
||||||
|
// SinceUtc intentionally left unset (null) — the proto wrapper is absent.
|
||||||
|
var request = new PullSiteCallsRequest
|
||||||
|
{
|
||||||
|
BatchSize = 100,
|
||||||
|
};
|
||||||
|
|
||||||
|
var response = await server.PullSiteCalls(request, NewContext());
|
||||||
|
|
||||||
|
Assert.Empty(response.Operationals);
|
||||||
|
Assert.False(response.MoreAvailable);
|
||||||
|
Assert.Equal(DateTime.MinValue, captured);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task PullSiteCalls_BatchSize3_Returns3Rows_MoreAvailableTrue()
|
public async Task PullSiteCalls_BatchSize3_Returns3Rows_MoreAvailableTrue()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user