merge: integrate WaitAsync/M5-audit (parallel session) with galaxy array-write + inbound-timeout fixes

This commit is contained in:
Joseph Doherty
2026-06-17 09:28:15 -04:00
88 changed files with 7714 additions and 169 deletions
@@ -35,4 +35,9 @@ public sealed class CommunicationServiceInstanceRouter : IInstanceRouter
public Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken) =>
_communicationService.RouteToSetAttributesAsync(siteId, request, cancellationToken);
/// <inheritdoc />
public Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken) =>
_communicationService.RouteToWaitForAttributeAsync(siteId, request, cancellationToken);
}
@@ -34,4 +34,12 @@ public interface IInstanceRouter
/// <returns>A task that resolves to the set-attributes response from the target site.</returns>
Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken);
/// <summary>Routes a wait-for-attribute request to the specified site (spec §6).</summary>
/// <param name="siteId">Target site identifier.</param>
/// <param name="request">The wait-for-attribute request to route (value-equality only).</param>
/// <param name="cancellationToken">Cancellation token for the routed call.</param>
/// <returns>A task that resolves to the wait-for-attribute response from the target site.</returns>
Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken);
}
@@ -6,6 +6,7 @@ using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Audit;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
@@ -95,6 +96,7 @@ public sealed class AuditWriteMiddleware
private readonly ILogger<AuditWriteMiddleware> _logger;
private readonly IOptionsMonitor<AuditLogOptions> _options;
private readonly IAuditActorAccessor? _actorAccessor;
private readonly IAuditInboundCeilingHitsCounter _ceilingHitsCounter;
/// <summary>
/// Initializes the middleware with its required dependencies.
@@ -110,18 +112,26 @@ public sealed class AuditWriteMiddleware
/// construct the middleware; when absent, actor resolution falls back to the
/// stashed API-key name only.
/// </param>
/// <param name="ceilingHitsCounter">
/// M5.3 (T7, optional): incremented whenever an inbound request or response
/// body is truncated at <see cref="AuditLogOptions.InboundMaxBytes"/>. Optional
/// so existing tests and composition roots without the central health snapshot
/// wired still construct without the counter; a NoOp is used when absent.
/// </param>
public AuditWriteMiddleware(
RequestDelegate next,
ICentralAuditWriter auditWriter,
ILogger<AuditWriteMiddleware> logger,
IOptionsMonitor<AuditLogOptions> options,
IAuditActorAccessor? actorAccessor = null)
IAuditActorAccessor? actorAccessor = null,
IAuditInboundCeilingHitsCounter? ceilingHitsCounter = null)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options));
_actorAccessor = actorAccessor;
_ceilingHitsCounter = ceilingHitsCounter ?? new NoOpAuditInboundCeilingHitsCounter();
}
/// <summary>
@@ -133,9 +143,11 @@ public sealed class AuditWriteMiddleware
{
var sw = Stopwatch.StartNew();
// Per-request hot read of the inbound cap so a live config change
// Per-request hot read of the options snapshot so a live config change
// picks up on the next request without re-resolving the singleton.
var cap = _options.CurrentValue.InboundMaxBytes;
// InboundMaxBytes is read once here and passed to the capture helpers.
var opts = _options.CurrentValue;
var cap = opts.InboundMaxBytes;
// Audit Log #23 (ParentExecutionId): mint the inbound request's per-request
// ExecutionId ONCE, here at the start of the request, and stash it on
@@ -163,9 +175,20 @@ public sealed class AuditWriteMiddleware
// ReadBufferedRequestBodyAsync's own ContentLength is 0 short-circuit
// returns (null, false) for the bodyless case anyway, so the audit row
// is unchanged.
//
// M5.3 (T7): check if the matched method/target has SkipBodyCapture set.
// The route value is resolved BEFORE the pipeline runs (route matching
// has already bound {methodName} at this point), so we can skip the
// EnableBuffering allocation and body read up front.
var methodNameForOverride = ctx.Request.RouteValues.TryGetValue("methodName", out var rv)
&& rv is string mn && !string.IsNullOrWhiteSpace(mn) ? mn : null;
var skipBody = methodNameForOverride != null
&& opts.PerTargetOverrides.TryGetValue(methodNameForOverride, out var perTarget)
&& perTarget.SkipBodyCapture;
var requestBody = (string?)null;
var requestTruncated = false;
if (RequestHasBody(ctx.Request))
if (!skipBody && RequestHasBody(ctx.Request))
{
ctx.Request.EnableBuffering();
(requestBody, requestTruncated) =
@@ -200,15 +223,25 @@ public sealed class AuditWriteMiddleware
// The forwarding wrapper has already written every byte to the
// original sink; this just pulls back the bounded UTF-8 string.
ctx.Response.Body = originalResponseBody;
var (responseBody, responseTruncated) = captureStream.GetCapturedBody();
var (capturedResponseBody, capturedResponseTruncated) = captureStream.GetCapturedBody();
// M5.3 (T7): if SkipBodyCapture is set, discard the captured response
// body (the request body was never captured above). The row + headers
// still emit with null RequestSummary / ResponseSummary.
// Truncation flags are also cleared so ceiling-hit counter is not
// bumped for methods that deliberately opt out of body capture.
var responseBody = skipBody ? null : capturedResponseBody;
var responseTruncated = skipBody ? false : capturedResponseTruncated;
EmitInboundAudit(
ctx,
opts,
sw.ElapsedMilliseconds,
thrown,
requestBody,
responseBody,
requestTruncated || responseTruncated);
requestTruncated || responseTruncated,
requestTruncated,
responseTruncated);
}
}
@@ -219,11 +252,14 @@ public sealed class AuditWriteMiddleware
/// </summary>
private void EmitInboundAudit(
HttpContext ctx,
AuditLogOptions opts,
long durationMs,
Exception? thrown,
string? requestBody,
string? responseBody,
bool payloadTruncated)
bool payloadTruncated,
bool requestTruncated = false,
bool responseTruncated = false)
{
try
{
@@ -243,10 +279,43 @@ public sealed class AuditWriteMiddleware
var actor = isAuthFailure ? null : ResolveActor(ctx);
var methodName = ResolveMethodName(ctx);
// M5.3 (T7): increment the ceiling-hits counter once per request
// that hit the cap on EITHER the request or response body.
if (requestTruncated || responseTruncated)
{
try { _ceilingHitsCounter.Increment(); } catch { /* swallow per §7 */ }
}
// M5.3 (T7): capture request headers into Extra JSON alongside the
// existing remoteIp / userAgent provenance fields. The header
// collection is run through the SAME header-redaction list
// (AuditLogOptions.HeaderRedactList) that the ScadaBridgeAuditRedactor
// applies to RequestSummary / ResponseSummary — auth/sensitive
// headers are redacted before they land in the row. Uses the SAME
// options snapshot captured at request start (passed in as opts) as
// the SkipBodyCapture / PerTargetOverrides decisions, so a mid-request
// live-reload can't split the body-capture and header-redaction
// verdicts across two different snapshots.
var redactSet = new HashSet<string>(
opts.HeaderRedactList,
StringComparer.OrdinalIgnoreCase);
var headerDict = new Dictionary<string, string>(StringComparer.Ordinal);
foreach (var header in ctx.Request.Headers)
{
// Redact headers whose name appears in the HeaderRedactList —
// the same "<redacted>" marker used by ScadaBridgeAuditRedactor.
var value = redactSet.Contains(header.Key)
? "<redacted>"
: header.Value.ToString();
headerDict[header.Key] = value;
}
var extra = JsonSerializer.Serialize(new
{
remoteIp = ctx.Connection.RemoteIpAddress?.ToString(),
userAgent = ctx.Request.Headers.UserAgent.ToString(),
requestHeaders = headerDict,
});
var evt = ScadaBridgeAuditEventFactory.Create(
@@ -205,6 +205,47 @@ public class RouteTarget
return response.Values;
}
/// <summary>
/// Blocks until a remote instance attribute reaches <paramref name="targetValue"/>
/// or <paramref name="timeout"/> elapses (spec §6). Value-equality ONLY across the
/// wire: the target is canonically encoded via <see cref="AttributeValueCodec"/> and
/// the site evaluates equality — there is no predicate and no quality flag in the
/// comparison.
/// </summary>
/// <param name="attributeName">Name of the attribute to wait on.</param>
/// <param name="targetValue">Target value the attribute must equal for the wait to match.</param>
/// <param name="timeout">Maximum time to wait for the attribute to reach the target value.</param>
/// <param name="cancellationToken">Optional cancellation token; defaults to the method deadline.</param>
/// <returns>A task that resolves to <c>true</c> if the attribute reached the target value, <c>false</c> if the wait timed out.</returns>
public async Task<bool> WaitForAttribute(
string attributeName,
object? targetValue,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
var token = Effective(cancellationToken);
var siteId = await ResolveSiteAsync(token);
// Audit Log #23 (ParentExecutionId): mirrors the Call path — stamp the
// spawning inbound request's ExecutionId so future site-side audit
// emission for routed waits can record this wait's parent. CorrelationId
// is the per-operation lifecycle id, freshly minted per routed wait.
var request = new RouteToWaitForAttributeRequest(
Guid.NewGuid().ToString(), _instanceCode, attributeName,
AttributeValueCodec.Encode(targetValue), timeout, DateTimeOffset.UtcNow,
_parentExecutionId);
var response = await _instanceRouter.RouteToWaitForAttributeAsync(siteId, request, token);
if (!response.Success)
{
throw new InvalidOperationException(
response.ErrorMessage ?? "Remote attribute wait failed");
}
return response.Matched;
}
/// <summary>
/// Sets a single attribute value on the remote instance.
/// </summary>