feat(audit): M5.6 SourceNode sentinel backfill (purge-role) + CLI + runbook note (T5)

This commit is contained in:
Joseph Doherty
2026-06-16 22:02:21 -04:00
parent de2968b03d
commit 55630b48b6
12 changed files with 1399 additions and 10 deletions
@@ -0,0 +1,113 @@
using System.Text;
using System.Text.Json;
namespace ZB.MOM.WW.ScadaBridge.CLI.Commands;
/// <summary>
/// Arguments for an <c>audit backfill-source-node</c> invocation.
/// </summary>
public sealed class AuditBackfillSourceNodeArgs
{
/// <summary>
/// Value written into <c>SourceNode</c> for NULL rows (default <c>"unknown"</c>).
/// </summary>
public string Sentinel { get; set; } = "unknown";
/// <summary>
/// Only rows with <c>OccurredAtUtc</c> strictly before this UTC datetime are
/// eligible. Required — must be an ISO-8601 UTC datetime.
/// </summary>
public string Before { get; set; } = string.Empty;
/// <summary>
/// Maximum rows updated per batch (default 5000). Caps the per-transaction
/// log footprint; the loop repeats until no rows remain.
/// </summary>
public int BatchSize { get; set; } = 5000;
}
/// <summary>
/// Pure helpers for the <c>audit backfill-source-node</c> subcommand (Audit Log
/// #23 M5.6 T5). Builds the request body, POSTs to
/// <c>/api/audit/backfill-source-node</c>, and renders the result. Kept separate
/// from the command wiring so each piece is unit-testable without standing up the
/// command tree.
/// </summary>
public static class AuditBackfillHelpers
{
private static readonly JsonSerializerOptions JsonWriteOptions = new()
{
WriteIndented = true,
};
/// <summary>
/// Builds the JSON request body for <c>POST /api/audit/backfill-source-node</c>.
/// </summary>
/// <param name="args">The backfill arguments.</param>
/// <returns>A JSON string suitable for the request body.</returns>
public static string BuildRequestBody(AuditBackfillSourceNodeArgs args)
{
var obj = new
{
sentinel = args.Sentinel,
before = args.Before,
batchSize = args.BatchSize,
};
return JsonSerializer.Serialize(obj);
}
/// <summary>
/// Executes the backfill: POSTs <c>/api/audit/backfill-source-node</c> and
/// prints the result. Returns the process exit code (0 = success,
/// 1 = error, 2 = authorization failure).
/// </summary>
/// <param name="client">The management HTTP client.</param>
/// <param name="args">The backfill arguments.</param>
/// <param name="output">The output writer for results.</param>
/// <returns>A task that resolves to the process exit code.</returns>
public static async Task<int> RunBackfillAsync(
ManagementHttpClient client,
AuditBackfillSourceNodeArgs args,
TextWriter output)
{
var body = BuildRequestBody(args);
var response = await client.SendPostAsync(
"api/audit/backfill-source-node", body, TimeSpan.FromMinutes(10));
if (response.JsonData == null)
{
OutputFormatter.WriteError(
response.Error ?? "Backfill request failed.", response.ErrorCode ?? "ERROR");
return CommandHelpers.IsAuthorizationFailure(response) ? 2 : 1;
}
// Parse and display the result.
try
{
using var doc = JsonDocument.Parse(response.JsonData);
var root = doc.RootElement;
var rowsUpdated = root.TryGetProperty("rowsUpdated", out var r)
? r.GetInt64()
: 0L;
var sentinel = root.TryGetProperty("sentinel", out var s)
? s.GetString() ?? args.Sentinel
: args.Sentinel;
var before = root.TryGetProperty("before", out var b)
? b.GetString() ?? args.Before
: args.Before;
output.WriteLine($"SourceNode backfill complete.");
output.WriteLine($" rows updated : {rowsUpdated}");
output.WriteLine($" sentinel : {sentinel}");
output.WriteLine($" before : {before}");
}
catch (JsonException)
{
// Server returned success but non-JSON body — not expected; print raw.
output.WriteLine(response.JsonData);
}
output.Flush();
return 0;
}
}
@@ -29,6 +29,7 @@ public static class AuditCommands
command.Add(BuildExport(urlOption, formatOption, usernameOption, passwordOption));
command.Add(BuildTree(urlOption, formatOption, usernameOption, passwordOption));
command.Add(BuildVerifyChain(urlOption, formatOption, usernameOption, passwordOption));
command.Add(BuildBackfillSourceNode(urlOption, formatOption, usernameOption, passwordOption));
return command;
}
@@ -288,4 +289,76 @@ public static class AuditCommands
});
return cmd;
}
/// <summary>
/// Builds the <c>audit backfill-source-node</c> sub-command (Audit Log #23 M5.6 T5).
/// Sets <c>SourceNode</c> on historical pre-feature rows whose <c>SourceNode IS NULL</c>
/// and <c>OccurredAtUtc</c> is older than <c>--before</c>, in batches. Admin-only.
/// </summary>
private static Command BuildBackfillSourceNode(Option<string> urlOption, Option<string> formatOption, Option<string> usernameOption, Option<string> passwordOption)
{
var sentinelOption = new Option<string>("--sentinel")
{
Description = "Value to write for pre-feature rows whose node-of-origin is unknown (default: unknown)",
};
sentinelOption.DefaultValueFactory = _ => "unknown";
var beforeOption = new Option<string>("--before")
{
Description = "ISO-8601 UTC datetime; only rows older than this date are eligible (required)",
Required = true,
};
var batchOption = new Option<int>("--batch")
{
Description = "Max rows updated per batch (default: 5000)",
};
batchOption.DefaultValueFactory = _ => 5000;
var cmd = new Command("backfill-source-node")
{
Description = "Set SourceNode to a sentinel value on pre-feature rows where it is NULL (admin-only, maintenance path)",
};
cmd.Add(sentinelOption);
cmd.Add(beforeOption);
cmd.Add(batchOption);
cmd.SetAction(async (ParseResult result) =>
{
var connection = AuditCommandHelpers.ResolveConnection(result, urlOption, usernameOption, passwordOption);
if (connection.Error != null)
{
OutputFormatter.WriteError(connection.Error, connection.ErrorCode!);
return 1;
}
var sentinel = result.GetValue(sentinelOption) ?? "unknown";
var before = result.GetValue(beforeOption)!;
var batch = result.GetValue(batchOption);
if (string.IsNullOrWhiteSpace(sentinel))
{
OutputFormatter.WriteError("--sentinel must be a non-empty string.", "INVALID_ARGUMENT");
return 1;
}
if (batch <= 0)
{
OutputFormatter.WriteError("--batch must be > 0.", "INVALID_ARGUMENT");
return 1;
}
var args = new AuditBackfillSourceNodeArgs
{
Sentinel = sentinel,
Before = before,
BatchSize = batch,
};
using var client = new ManagementHttpClient(connection.Url!, connection.Username!, connection.Password!);
return await AuditBackfillHelpers.RunBackfillAsync(client, args, Console.Out);
});
return cmd;
}
}
@@ -142,6 +142,60 @@ public class ManagementHttpClient : IDisposable
return new ManagementResponse((int)httpResponse.StatusCode, null, error, code);
}
/// <summary>
/// Issues a plain HTTP <c>POST</c> against a REST endpoint (e.g. the audit
/// maintenance endpoints) with a JSON body and returns the response. Unlike
/// <see cref="SendCommandAsync"/>, this does not wrap the call in the
/// <c>POST /management</c> command envelope — these are plain REST resources.
/// Authentication (HTTP Basic) and the base address are shared.
/// </summary>
/// <param name="relativePath">Path relative to the base URL.</param>
/// <param name="body">The JSON body to send, or <c>null</c> for an empty body.</param>
/// <param name="timeout">The request timeout.</param>
/// <returns>A management response containing status and data.</returns>
public async Task<ManagementResponse> SendPostAsync(string relativePath, string? body, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
var content = new StringContent(body ?? "{}", Encoding.UTF8, "application/json");
HttpResponseMessage httpResponse;
try
{
httpResponse = await _httpClient.PostAsync(relativePath, content, cts.Token);
}
catch (TaskCanceledException)
{
return new ManagementResponse(504, null, "Request timed out.", "TIMEOUT");
}
catch (HttpRequestException ex)
{
return new ManagementResponse(0, null, $"Connection failed: {ex.Message}", "CONNECTION_FAILED");
}
var responseBody = await httpResponse.Content.ReadAsStringAsync(cts.Token);
if (httpResponse.IsSuccessStatusCode)
{
return new ManagementResponse((int)httpResponse.StatusCode, responseBody, null, null);
}
string? error = null;
string? code = null;
try
{
using var doc = JsonDocument.Parse(responseBody);
error = doc.RootElement.TryGetProperty("error", out var e) ? e.GetString() : responseBody;
code = doc.RootElement.TryGetProperty("code", out var c) ? c.GetString() : null;
}
catch
{
error = responseBody;
}
return new ManagementResponse((int)httpResponse.StatusCode, null, error, code);
}
/// <summary>
/// Issues a plain HTTP <c>GET</c> and returns the raw <see cref="HttpResponseMessage"/>
/// so the caller can stream the response body without buffering it in memory — used
@@ -201,4 +201,59 @@ public interface IAuditLogRepository
/// <param name="ct">Cancellation token.</param>
/// <returns>A task that resolves to the distinct, non-null source node names in ascending order.</returns>
Task<IReadOnlyList<string>> GetDistinctSourceNodesAsync(CancellationToken ct = default);
/// <summary>
/// M5.6 (T5) one-time operational backfill: sets <c>SourceNode</c> to
/// <paramref name="sentinel"/> on every row where <c>SourceNode IS NULL</c>
/// and <c>OccurredAtUtc &lt; <paramref name="before"/></c>, in bounded
/// batches of <paramref name="batchSize"/> rows, looping until no further
/// rows match. Returns the total number of rows updated across all batches.
/// </summary>
/// <remarks>
/// <para>
/// <b>Why a sentinel, not the real value.</b> <c>SourceNode</c> captures the
/// physical cluster node on which an event was emitted. For pre-feature rows
/// that were ingested before the column was stamped, the true node-of-origin
/// is UNKNOWABLE — the original emitter is long gone and there is no
/// retroactive way to determine it. Backfilling a configurable sentinel
/// (default <c>"unknown"</c>) makes it explicit that these rows pre-date the
/// feature rather than silently leaving them NULL (which the filter UI already
/// treats as "unresolved" but which an operator might mistake for a bug).
/// </para>
/// <para>
/// <b><c>ExecutionId</c> / <c>ParentExecutionId</c> cannot be backfilled.</b>
/// These are PERSISTED COMPUTED columns derived from <c>DetailsJson</c>. The
/// AuditLog append-only invariant forbids mutating <c>DetailsJson</c>, so
/// the computed values for pre-feature rows remain NULL permanently. This is
/// documented rather than coded — see the Ops Note in
/// <c>Component-AuditLog.md § Ops Notes — Historical Null Columns</c>.
/// </para>
/// <para>
/// <b>Maintenance path — NOT the writer role.</b> This UPDATE runs on the
/// purge/maintenance connection (the same path as
/// <see cref="SwitchOutPartitionAsync"/> and any per-channel purge), NOT the
/// append-only <c>scadabridge_audit_writer</c> role. The CI guard
/// (<c>AuditLogAppendOnlyGuardTests</c>) recognises the
/// <c>// AUDIT-PURGE-ALLOWED</c> marker on the UPDATE line and forgives
/// exactly this one sanctioned maintenance-path UPDATE; any other UPDATE
/// against <c>AuditLog</c> still trips the guard.
/// </para>
/// <para>
/// <b>Bounded + idempotent.</b> <c>UPDATE TOP (@batch)</c> caps the
/// transaction-log and lock footprint per statement. The loop exits when a
/// batch updates zero rows, so a crash mid-loop is recoverable by simply
/// running again; re-running after completion is a no-op (no NULL rows
/// remain for the given <paramref name="before"/> window).
/// </para>
/// </remarks>
/// <param name="sentinel">Value to write into <c>SourceNode</c> for pre-feature rows (e.g. <c>"unknown"</c>).</param>
/// <param name="before">Rows with <c>OccurredAtUtc</c> strictly older than this UTC datetime are eligible.</param>
/// <param name="batchSize">Maximum rows updated per batch; must be &gt; 0.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>A task that resolves to the total number of rows updated across all batches.</returns>
Task<long> BackfillSourceNodeAsync(
string sentinel,
DateTime before,
int batchSize,
CancellationToken ct = default);
}
@@ -716,6 +716,102 @@ VALUES
.ToListAsync(ct);
}
/// <inheritdoc />
public async Task<long> BackfillSourceNodeAsync(
string sentinel,
DateTime before,
int batchSize,
CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(sentinel))
{
throw new ArgumentException("Sentinel must be a non-empty value.", nameof(sentinel));
}
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be > 0.");
}
var beforeUtc = DateTime.SpecifyKind(before.ToUniversalTime(), DateTimeKind.Utc);
// M5.6 (T5) SourceNode sentinel backfill. This is the ONE sanctioned UPDATE
// against dbo.AuditLog in the codebase. It touches ONLY rows where
// SourceNode IS NULL AND OccurredAtUtc < @before — rows that pre-date the
// M5.6 feature and whose node-of-origin is UNKNOWABLE. The sentinel (default
// "unknown") makes that explicit. ExecutionId/ParentExecutionId are PERSISTED
// COMPUTED columns derived from DetailsJson — mutating DetailsJson is forbidden
// under the append-only invariant, so those stay NULL on pre-feature rows.
//
// Maintenance path (NOT the writer role): runs on the same connection used for
// SwitchOutPartitionAsync (partition-switch DDL), which requires a role that
// holds UPDATE — the append-only scadabridge_audit_writer role has only
// INSERT + SELECT.
//
// Bounded + idempotent: UPDATE TOP (@batch) caps the log/lock footprint per
// statement; the loop exits when a batch updates 0 rows. Re-running after a
// crash simply resumes where it left off.
//
// The trailing AUDIT-PURGE-ALLOWED marker on the UPDATE line below is the
// single narrow exemption the append-only CI guard (AuditLogAppendOnlyGuardTests)
// recognises for an UPDATE; any other UPDATE targeting AuditLog still trips the guard.
const string updateBatchSql =
"UPDATE TOP (@batch) dbo.AuditLog SET SourceNode = @sentinel WHERE SourceNode IS NULL AND OccurredAtUtc < @before;"; // AUDIT-PURGE-ALLOWED: SourceNode sentinel backfill (M5.6 T5), maintenance path
long totalUpdated = 0;
var conn = _context.Database.GetDbConnection();
var openedHere = false;
if (conn.State != System.Data.ConnectionState.Open)
{
await conn.OpenAsync(ct).ConfigureAwait(false);
openedHere = true;
}
try
{
while (true)
{
ct.ThrowIfCancellationRequested();
await using var cmd = conn.CreateCommand();
cmd.CommandText = updateBatchSql;
var pBatch = cmd.CreateParameter();
pBatch.ParameterName = "@batch";
pBatch.Value = batchSize;
cmd.Parameters.Add(pBatch);
var pSentinel = cmd.CreateParameter();
pSentinel.ParameterName = "@sentinel";
pSentinel.Value = sentinel;
cmd.Parameters.Add(pSentinel);
var pBefore = cmd.CreateParameter();
pBefore.ParameterName = "@before";
pBefore.Value = beforeUtc;
cmd.Parameters.Add(pBefore);
var rows = await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
if (rows <= 0)
{
break;
}
totalUpdated += rows;
}
}
finally
{
if (openedHere)
{
await conn.CloseAsync().ConfigureAwait(false);
}
}
return totalUpdated;
}
/// <summary>
/// Splits a <c>STRING_AGG</c> comma-joined value into a distinct, ordered
/// list. A null/empty aggregate (a stub node with no rows) yields an empty
@@ -89,9 +89,16 @@ public static class AuditEndpoints
Converters = { new JsonStringEnumConverter() },
};
/// <summary>Default sentinel written by the backfill endpoint when the caller omits <c>sentinel</c>.</summary>
public const string DefaultBackfillSentinel = "unknown";
/// <summary>Default batch size for the backfill endpoint when the caller omits <c>batchSize</c>.</summary>
public const int DefaultBackfillBatchSize = 5000;
/// <summary>
/// Registers the <c>/api/audit/query</c>, <c>/api/audit/export</c>, and
/// <c>/api/audit/tree</c> minimal-API endpoints.
/// Registers the <c>/api/audit/query</c>, <c>/api/audit/export</c>,
/// <c>/api/audit/tree</c>, and <c>POST /api/audit/backfill-source-node</c>
/// minimal-API endpoints.
/// </summary>
/// <param name="endpoints">The endpoint route builder to register routes on.</param>
/// <returns>The same <paramref name="endpoints"/> builder, for chaining.</returns>
@@ -100,6 +107,7 @@ public static class AuditEndpoints
endpoints.MapGet("/api/audit/query", (Delegate)HandleQuery);
endpoints.MapGet("/api/audit/export", (Delegate)HandleExport);
endpoints.MapGet("/api/audit/tree", (Delegate)HandleTree);
endpoints.MapPost("/api/audit/backfill-source-node", (Delegate)HandleBackfillSourceNode);
return endpoints;
}
@@ -279,6 +287,136 @@ public static class AuditEndpoints
return Results.Json(nodes, JsonOptions);
}
// ─────────────────────────────────────────────────────────────────────
// POST /api/audit/backfill-source-node
// ─────────────────────────────────────────────────────────────────────
/// <summary>
/// Handles <c>POST /api/audit/backfill-source-node</c>: authenticates (Admin role
/// required), reads the JSON body for <c>sentinel</c> / <c>before</c> /
/// <c>batchSize</c>, and calls
/// <see cref="IAuditLogRepository.BackfillSourceNodeAsync"/> on the maintenance
/// path.
///
/// <para>
/// <b>Auth.</b> Admin-only — backfilling the SourceNode column is a one-time ops
/// procedure that mutates the AuditLog table via the maintenance path (NOT the
/// append-only writer role). Restricted to <see cref="AuthorizationPolicies.AuditExportRoles"/>
/// (Administrator) so it is never accessible to Viewer-role users.
/// </para>
///
/// <para>
/// <b>Request body.</b>
/// <code>
/// {
/// "sentinel": "unknown", // optional; default "unknown"
/// "before": "2026-01-01T00:00:00Z", // required ISO-8601 UTC
/// "batchSize": 5000 // optional; default 5000
/// }
/// </code>
/// </para>
///
/// <para>
/// <b>Response (200).</b>
/// <code>{ "rowsUpdated": 12345, "sentinel": "unknown", "before": "2026-01-01T00:00:00Z" }</code>
/// </para>
/// </summary>
/// <param name="context">The HTTP context for the current request.</param>
/// <returns>A task that resolves to the HTTP result (200 JSON, 400, 401, or 403).</returns>
internal static async Task<IResult> HandleBackfillSourceNode(HttpContext context)
{
var auth = await AuthenticateAsync(context);
if (auth.Failure is not null)
{
return auth.Failure;
}
// Admin-only: backfilling is a one-time ops procedure on the maintenance path.
if (!HasAnyRole(auth.User!, AuthorizationPolicies.AuditExportRoles))
{
return Forbidden("Administrator");
}
string bodyText;
try
{
using var reader = new System.IO.StreamReader(context.Request.Body);
bodyText = await reader.ReadToEndAsync(context.RequestAborted);
}
catch (OperationCanceledException)
{
return Results.Json(new { error = "Request cancelled.", code = "CANCELLED" }, statusCode: 499);
}
string sentinel = DefaultBackfillSentinel;
DateTime? beforeUtc = null;
int batchSize = DefaultBackfillBatchSize;
if (!string.IsNullOrWhiteSpace(bodyText))
{
try
{
using var doc = System.Text.Json.JsonDocument.Parse(bodyText);
var root = doc.RootElement;
if (root.TryGetProperty("sentinel", out var sentinelEl))
{
var s = sentinelEl.GetString();
if (!string.IsNullOrWhiteSpace(s))
{
sentinel = s.Trim();
}
}
if (root.TryGetProperty("before", out var beforeEl))
{
if (DateTime.TryParse(
beforeEl.GetString(),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.AssumeUniversal | System.Globalization.DateTimeStyles.AdjustToUniversal,
out var parsed))
{
beforeUtc = DateTime.SpecifyKind(parsed, DateTimeKind.Utc);
}
else
{
return Results.Json(
new { error = "Invalid 'before' value; expected ISO-8601 UTC datetime.", code = "BAD_REQUEST" },
statusCode: 400);
}
}
if (root.TryGetProperty("batchSize", out var batchEl) && batchEl.TryGetInt32(out var b) && b > 0)
{
batchSize = b;
}
}
catch (System.Text.Json.JsonException)
{
return Results.Json(
new { error = "Request body must be valid JSON.", code = "BAD_REQUEST" },
statusCode: 400);
}
}
if (beforeUtc is null)
{
return Results.Json(
new { error = "Required field 'before' (ISO-8601 UTC datetime) is missing.", code = "BAD_REQUEST" },
statusCode: 400);
}
var repo = context.RequestServices.GetRequiredService<IAuditLogRepository>();
var rowsUpdated = await repo.BackfillSourceNodeAsync(sentinel, beforeUtc.Value, batchSize, context.RequestAborted);
return Results.Json(new
{
rowsUpdated,
sentinel,
before = beforeUtc.Value.ToString("O", System.Globalization.CultureInfo.InvariantCulture),
}, JsonOptions);
}
/// <summary>
/// Streams every matching row as RFC 4180 CSV, paging the repository with its
/// keyset cursor and flushing after each page so a large export starts