feat(inbound): routed Route.To().WaitForAttribute — contract + central path (spec §6)
This commit is contained in:
@@ -83,3 +83,46 @@ public record RouteToSetAttributesResponse(
|
||||
bool Success,
|
||||
string? ErrorMessage,
|
||||
DateTimeOffset Timestamp);
|
||||
|
||||
/// <summary>
|
||||
/// Request to block until a remote instance attribute reaches a target value
|
||||
/// (spec §6 — <c>Route.To("inst").WaitForAttribute(name, targetValue, timeout)</c>).
|
||||
/// Value-equality ONLY across the wire: <see cref="TargetValueEncoded"/> carries the
|
||||
/// canonical <c>AttributeValueCodec</c>-encoded target; there is no predicate and no
|
||||
/// quality flag in the comparison. The site evaluates equality and either matches or
|
||||
/// times out.
|
||||
/// </summary>
|
||||
/// <param name="ParentExecutionId">
|
||||
/// Audit Log #23 (ParentExecutionId): mirrors <see cref="RouteToCallRequest.ParentExecutionId"/>.
|
||||
/// For an inbound-API-routed wait this is the inbound request's per-request execution id;
|
||||
/// future site-side audit emission for routed waits can stamp it as <c>ParentExecutionId</c>
|
||||
/// so the inbound→site execution-tree link survives the wait path. Additive trailing
|
||||
/// member — null for the Central UI sandbox path or for callers built before the field existed.
|
||||
/// </param>
|
||||
public record RouteToWaitForAttributeRequest(
|
||||
string CorrelationId,
|
||||
string InstanceUniqueName,
|
||||
string AttributeName,
|
||||
string? TargetValueEncoded,
|
||||
TimeSpan Timeout,
|
||||
DateTimeOffset Timestamp,
|
||||
Guid? ParentExecutionId = null);
|
||||
|
||||
/// <summary>
|
||||
/// Response from a remote attribute wait. <see cref="Success"/>/<see cref="ErrorMessage"/>
|
||||
/// convey the routing-level outcome (e.g. instance-not-found); <see cref="Matched"/>,
|
||||
/// <see cref="TimedOut"/>, <see cref="Value"/>, and <see cref="Quality"/> convey the wait
|
||||
/// outcome itself. When <see cref="Success"/> is <c>true</c>, exactly one of
|
||||
/// <see cref="Matched"/>/<see cref="TimedOut"/> holds: <see cref="Matched"/> means the
|
||||
/// attribute reached the target value (with <see cref="Value"/>/<see cref="Quality"/>
|
||||
/// captured at the match), <see cref="TimedOut"/> means the deadline elapsed first.
|
||||
/// </summary>
|
||||
public record RouteToWaitForAttributeResponse(
|
||||
string CorrelationId,
|
||||
bool Matched,
|
||||
object? Value,
|
||||
string? Quality,
|
||||
bool TimedOut,
|
||||
bool Success,
|
||||
string? ErrorMessage,
|
||||
DateTimeOffset Timestamp);
|
||||
|
||||
@@ -445,6 +445,25 @@ public class CommunicationService
|
||||
envelope, _options.IntegrationTimeout, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Routes an inbound API wait-for-attribute request to a site (spec §6).
|
||||
/// </summary>
|
||||
/// <param name="siteId">The target site identifier.</param>
|
||||
/// <param name="request">The wait-for-attribute route request.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>The wait-for-attribute route response.</returns>
|
||||
public async Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
|
||||
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var envelope = new SiteEnvelope(siteId, request);
|
||||
// A wait legitimately blocks up to request.Timeout on the site, so the cluster
|
||||
// Ask must be bounded by the WAIT deadline (plus integration-timeout slack for
|
||||
// the round trip), not the generic IntegrationTimeout used by the other routes.
|
||||
var askTimeout = request.Timeout + _options.IntegrationTimeout;
|
||||
return await GetActor().Ask<RouteToWaitForAttributeResponse>(
|
||||
envelope, askTimeout, cancellationToken);
|
||||
}
|
||||
|
||||
// ── Notification Outbox (central-local actor — Asked directly, no SiteEnvelope) ──
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -35,4 +35,9 @@ public sealed class CommunicationServiceInstanceRouter : IInstanceRouter
|
||||
public Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
|
||||
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken) =>
|
||||
_communicationService.RouteToSetAttributesAsync(siteId, request, cancellationToken);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
|
||||
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken) =>
|
||||
_communicationService.RouteToWaitForAttributeAsync(siteId, request, cancellationToken);
|
||||
}
|
||||
|
||||
@@ -34,4 +34,12 @@ public interface IInstanceRouter
|
||||
/// <returns>A task that resolves to the set-attributes response from the target site.</returns>
|
||||
Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
|
||||
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Routes a wait-for-attribute request to the specified site (spec §6).</summary>
|
||||
/// <param name="siteId">Target site identifier.</param>
|
||||
/// <param name="request">The wait-for-attribute request to route (value-equality only).</param>
|
||||
/// <param name="cancellationToken">Cancellation token for the routed call.</param>
|
||||
/// <returns>A task that resolves to the wait-for-attribute response from the target site.</returns>
|
||||
Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
|
||||
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
@@ -205,6 +205,47 @@ public class RouteTarget
|
||||
return response.Values;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Blocks until a remote instance attribute reaches <paramref name="targetValue"/>
|
||||
/// or <paramref name="timeout"/> elapses (spec §6). Value-equality ONLY across the
|
||||
/// wire: the target is canonically encoded via <see cref="AttributeValueCodec"/> and
|
||||
/// the site evaluates equality — there is no predicate and no quality flag in the
|
||||
/// comparison.
|
||||
/// </summary>
|
||||
/// <param name="attributeName">Name of the attribute to wait on.</param>
|
||||
/// <param name="targetValue">Target value the attribute must equal for the wait to match.</param>
|
||||
/// <param name="timeout">Maximum time to wait for the attribute to reach the target value.</param>
|
||||
/// <param name="cancellationToken">Optional cancellation token; defaults to the method deadline.</param>
|
||||
/// <returns>A task that resolves to <c>true</c> if the attribute reached the target value, <c>false</c> if the wait timed out.</returns>
|
||||
public async Task<bool> WaitForAttribute(
|
||||
string attributeName,
|
||||
object? targetValue,
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var token = Effective(cancellationToken);
|
||||
var siteId = await ResolveSiteAsync(token);
|
||||
|
||||
// Audit Log #23 (ParentExecutionId): mirrors the Call path — stamp the
|
||||
// spawning inbound request's ExecutionId so future site-side audit
|
||||
// emission for routed waits can record this wait's parent. CorrelationId
|
||||
// is the per-operation lifecycle id, freshly minted per routed wait.
|
||||
var request = new RouteToWaitForAttributeRequest(
|
||||
Guid.NewGuid().ToString(), _instanceCode, attributeName,
|
||||
AttributeValueCodec.Encode(targetValue), timeout, DateTimeOffset.UtcNow,
|
||||
_parentExecutionId);
|
||||
|
||||
var response = await _instanceRouter.RouteToWaitForAttributeAsync(siteId, request, token);
|
||||
|
||||
if (!response.Success)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
response.ErrorMessage ?? "Remote attribute wait failed");
|
||||
}
|
||||
|
||||
return response.Matched;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets a single attribute value on the remote instance.
|
||||
/// </summary>
|
||||
|
||||
+6
@@ -623,5 +623,11 @@ public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture<MsSqlMig
|
||||
public Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
|
||||
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
// WaitForAttribute is not part of this fixture's routed-Call audit scenario;
|
||||
// mirror the other non-Call methods (unexercised here).
|
||||
public Task<RouteToWaitForAttributeResponse> RouteToWaitForAttributeAsync(
|
||||
string siteId, RouteToWaitForAttributeRequest request, CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
using NSubstitute;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.InboundApi;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.InboundAPI.Tests;
|
||||
|
||||
@@ -139,6 +140,116 @@ public class RouteHelperTests
|
||||
Assert.Equal("read failed", ex.Message);
|
||||
}
|
||||
|
||||
// --- WaitForAttribute (spec §6) ---
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_Matched_ReturnsTrue()
|
||||
{
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Any<RouteToWaitForAttributeRequest>(), Arg.Any<CancellationToken>())
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: true, Value: true, Quality: "Good", TimedOut: false,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow));
|
||||
|
||||
var matched = await CreateHelper().To("inst-1")
|
||||
.WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30));
|
||||
|
||||
Assert.True(matched);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_TimedOut_ReturnsFalse()
|
||||
{
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Any<RouteToWaitForAttributeRequest>(), Arg.Any<CancellationToken>())
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: false, Value: null, Quality: null, TimedOut: true,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow));
|
||||
|
||||
var matched = await CreateHelper().To("inst-1")
|
||||
.WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30));
|
||||
|
||||
Assert.False(matched);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_RoutingFailure_ThrowsInvalidOperationException()
|
||||
{
|
||||
// Success=false is a routing-level outcome (e.g. instance not found on the
|
||||
// site), distinct from the wait outcome (Matched/TimedOut).
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Any<RouteToWaitForAttributeRequest>(), Arg.Any<CancellationToken>())
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: false, Value: null, Quality: null, TimedOut: false,
|
||||
Success: false, ErrorMessage: "instance not found", DateTimeOffset.UtcNow));
|
||||
|
||||
var ex = await Assert.ThrowsAsync<InvalidOperationException>(
|
||||
() => CreateHelper().To("inst-1").WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30)));
|
||||
Assert.Equal("instance not found", ex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_EncodesTargetValue_OnRequest()
|
||||
{
|
||||
// Value-equality only across the wire: the target value is encoded via the
|
||||
// canonical AttributeValueCodec, identical to how attribute values travel.
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
RouteToWaitForAttributeRequest? captured = null;
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Do<RouteToWaitForAttributeRequest>(r => captured = r), Arg.Any<CancellationToken>())
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: true, Value: true, Quality: "Good", TimedOut: false,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow));
|
||||
|
||||
await CreateHelper().To("inst-1").WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30));
|
||||
|
||||
Assert.NotNull(captured);
|
||||
Assert.Equal("Flag", captured!.AttributeName);
|
||||
Assert.Equal(TimeSpan.FromSeconds(30), captured.Timeout);
|
||||
Assert.Equal(AttributeValueCodec.Encode(true), captured.TargetValueEncoded);
|
||||
Assert.True(Guid.TryParse(captured.CorrelationId, out _));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_WithNoExplicitToken_InheritsMethodDeadlineToken()
|
||||
{
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
using var deadline = new CancellationTokenSource();
|
||||
CancellationToken seen = default;
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Any<RouteToWaitForAttributeRequest>(), Arg.Do<CancellationToken>(t => seen = t))
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: false, Value: null, Quality: null, TimedOut: true,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow));
|
||||
|
||||
var bound = CreateHelper().WithDeadline(deadline.Token);
|
||||
await bound.To("inst-1").WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30));
|
||||
|
||||
Assert.Equal(deadline.Token, seen);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_WithParentExecutionId_CarriesItOnRequest()
|
||||
{
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
var inboundExecutionId = Guid.NewGuid();
|
||||
RouteToWaitForAttributeRequest? captured = null;
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Do<RouteToWaitForAttributeRequest>(r => captured = r), Arg.Any<CancellationToken>())
|
||||
.Returns(ci => new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: true, Value: true, Quality: "Good", TimedOut: false,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow));
|
||||
|
||||
var bound = CreateHelper().WithParentExecutionId(inboundExecutionId);
|
||||
await bound.To("inst-1").WaitForAttribute("Flag", true, TimeSpan.FromSeconds(30));
|
||||
|
||||
Assert.NotNull(captured);
|
||||
Assert.Equal(inboundExecutionId, captured!.ParentExecutionId);
|
||||
}
|
||||
|
||||
// --- SetAttribute(s) ---
|
||||
|
||||
[Fact]
|
||||
|
||||
Reference in New Issue
Block a user