feat: thread BrowseNext continuation token through actor + BrowseService (T15)
This commit is contained in:
@@ -40,6 +40,7 @@ public sealed class BrowseService : IBrowseService
|
|||||||
string siteId,
|
string siteId,
|
||||||
string connectionName,
|
string connectionName,
|
||||||
string? parentNodeId,
|
string? parentNodeId,
|
||||||
|
string? continuationToken = null,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
// CentralUI-side role guard — sites don't enforce envelope-level roles,
|
// CentralUI-side role guard — sites don't enforce envelope-level roles,
|
||||||
@@ -57,7 +58,7 @@ public sealed class BrowseService : IBrowseService
|
|||||||
{
|
{
|
||||||
return await _communication.BrowseNodeAsync(
|
return await _communication.BrowseNodeAsync(
|
||||||
siteId,
|
siteId,
|
||||||
new BrowseNodeCommand(connectionName, parentNodeId),
|
new BrowseNodeCommand(connectionName, parentNodeId, continuationToken),
|
||||||
cancellationToken);
|
cancellationToken);
|
||||||
}
|
}
|
||||||
catch (TimeoutException ex)
|
catch (TimeoutException ex)
|
||||||
|
|||||||
@@ -28,11 +28,13 @@ public interface IBrowseService
|
|||||||
/// <param name="siteId">The target site identifier.</param>
|
/// <param name="siteId">The target site identifier.</param>
|
||||||
/// <param name="connectionName">Name of the site-local data connection to browse against — the site's <c>DataConnectionManagerActor</c> indexes its children by name.</param>
|
/// <param name="connectionName">Name of the site-local data connection to browse against — the site's <c>DataConnectionManagerActor</c> indexes its children by name.</param>
|
||||||
/// <param name="parentNodeId">Node to browse, or <c>null</c> to browse from the server root.</param>
|
/// <param name="parentNodeId">Node to browse, or <c>null</c> to browse from the server root.</param>
|
||||||
|
/// <param name="continuationToken">Opaque cursor from a prior <see cref="BrowseNodeResult.ContinuationToken"/> to fetch the NEXT page under <paramref name="parentNodeId"/>; <c>null</c> for the first page.</param>
|
||||||
/// <param name="cancellationToken">Cancellation token.</param>
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
/// <returns>A task that resolves to a <see cref="BrowseNodeResult"/> containing child nodes or a <see cref="BrowseFailure"/> on error.</returns>
|
/// <returns>A task that resolves to a <see cref="BrowseNodeResult"/> containing child nodes or a <see cref="BrowseFailure"/> on error.</returns>
|
||||||
Task<BrowseNodeResult> BrowseChildrenAsync(
|
Task<BrowseNodeResult> BrowseChildrenAsync(
|
||||||
string siteId,
|
string siteId,
|
||||||
string connectionName,
|
string connectionName,
|
||||||
string? parentNodeId,
|
string? parentNodeId,
|
||||||
|
string? continuationToken = null,
|
||||||
CancellationToken cancellationToken = default);
|
CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,14 +15,30 @@ namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
|
|||||||
/// </remarks>
|
/// </remarks>
|
||||||
/// <param name="ConnectionName">Name of the site-local data connection to browse against.</param>
|
/// <param name="ConnectionName">Name of the site-local data connection to browse against.</param>
|
||||||
/// <param name="ParentNodeId">Node to browse, or null to browse from the server root (ObjectsFolder).</param>
|
/// <param name="ParentNodeId">Node to browse, or null to browse from the server root (ObjectsFolder).</param>
|
||||||
|
/// <param name="ContinuationToken">
|
||||||
|
/// Opaque adapter cursor for fetching the NEXT page of children under
|
||||||
|
/// <see cref="ParentNodeId"/>. Null on the first request; on a subsequent
|
||||||
|
/// request carry back the token from the prior <see cref="BrowseNodeResult.ContinuationToken"/>.
|
||||||
|
/// Additive (appended last) so positional construction stays source-compatible.
|
||||||
|
/// </param>
|
||||||
public record BrowseNodeCommand(
|
public record BrowseNodeCommand(
|
||||||
string ConnectionName,
|
string ConnectionName,
|
||||||
string? ParentNodeId);
|
string? ParentNodeId,
|
||||||
|
string? ContinuationToken = null);
|
||||||
|
|
||||||
|
/// <param name="Children">Immediate children resolved for the browsed node (this page only).</param>
|
||||||
|
/// <param name="Truncated">True when the result was clipped (frame-budget cap or adapter-reported truncation).</param>
|
||||||
|
/// <param name="Failure">Structured failure, or null on success.</param>
|
||||||
|
/// <param name="ContinuationToken">
|
||||||
|
/// Opaque adapter cursor for the NEXT page when more children remain, or null
|
||||||
|
/// when this is the final page. Surface it back in a follow-up
|
||||||
|
/// <see cref="BrowseNodeCommand.ContinuationToken"/>. Additive (appended last).
|
||||||
|
/// </param>
|
||||||
public record BrowseNodeResult(
|
public record BrowseNodeResult(
|
||||||
IReadOnlyList<BrowseNode> Children,
|
IReadOnlyList<BrowseNode> Children,
|
||||||
bool Truncated,
|
bool Truncated,
|
||||||
BrowseFailure? Failure);
|
BrowseFailure? Failure,
|
||||||
|
string? ContinuationToken = null);
|
||||||
|
|
||||||
public record BrowseFailure(
|
public record BrowseFailure(
|
||||||
BrowseFailureKind Kind,
|
BrowseFailureKind Kind,
|
||||||
|
|||||||
@@ -1163,14 +1163,16 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
|
|
||||||
_log.Debug("[{0}] Browsing children of {1}", _connectionName, command.ParentNodeId ?? "(root)");
|
_log.Debug("[{0}] Browsing children of {1}", _connectionName, command.ParentNodeId ?? "(root)");
|
||||||
|
|
||||||
browsable.BrowseChildrenAsync(command.ParentNodeId).ContinueWith(t =>
|
browsable.BrowseChildrenAsync(command.ParentNodeId, command.ContinuationToken).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
if (t.IsCompletedSuccessfully)
|
if (t.IsCompletedSuccessfully)
|
||||||
{
|
{
|
||||||
// Bound the reply to stay under Akka's remote frame size before it
|
// Bound the reply to stay under Akka's remote frame size before it
|
||||||
// crosses the site→central boundary (see CapBrowseChildren).
|
// crosses the site→central boundary (see CapBrowseChildren).
|
||||||
var (children, truncated) = CapBrowseChildren(t.Result.Children, t.Result.Truncated);
|
var (children, truncated) = CapBrowseChildren(t.Result.Children, t.Result.Truncated);
|
||||||
return new BrowseNodeResult(children, truncated, Failure: null);
|
// Carry the adapter's continuation cursor through so the UI can ask
|
||||||
|
// for the next page (BrowseNext). Null when this is the final page.
|
||||||
|
return new BrowseNodeResult(children, truncated, Failure: null, t.Result.ContinuationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
var baseEx = t.Exception?.GetBaseException();
|
var baseEx = t.Exception?.GetBaseException();
|
||||||
|
|||||||
+53
@@ -253,4 +253,57 @@ public class DataConnectionManagerBrowseHandlerTests : TestKit
|
|||||||
var keptBytes = reply.Children.Sum(n => 64 + n.NodeId.Length + n.DisplayName.Length);
|
var keptBytes = reply.Children.Sum(n => 64 + n.NodeId.Length + n.DisplayName.Length);
|
||||||
Assert.True(keptBytes <= byteBudget, $"kept estimate {keptBytes} exceeds budget {byteBudget}");
|
Assert.True(keptBytes <= byteBudget, $"kept estimate {keptBytes} exceeds budget {byteBudget}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ContinuationToken_round_trips_through_the_browse_handler()
|
||||||
|
{
|
||||||
|
// Task B3 (T15): the BrowseNext continuation token must thread through the
|
||||||
|
// actor plumbing in both directions — the inbound command's token must reach
|
||||||
|
// the adapter, and the adapter's returned token must be surfaced on the reply
|
||||||
|
// so the UI can request the next page. We capture the token the adapter is
|
||||||
|
// called with and return a fresh one, then assert both halves.
|
||||||
|
const string inboundToken = "page-1-cursor";
|
||||||
|
const string outboundToken = "page-2-cursor";
|
||||||
|
string? receivedToken = "NOT-CALLED";
|
||||||
|
|
||||||
|
var adapter = Substitute.For<IDataConnection, IBrowsableDataConnection>();
|
||||||
|
((IDataConnection)adapter).ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
((IDataConnection)adapter).Status.Returns(ConnectionHealth.Connected);
|
||||||
|
|
||||||
|
var children = new[]
|
||||||
|
{
|
||||||
|
new BrowseNode("ns=2;s=Next", "Next", BrowseNodeClass.Variable, HasChildren: false),
|
||||||
|
};
|
||||||
|
((IBrowsableDataConnection)adapter)
|
||||||
|
.BrowseChildrenAsync(Arg.Any<string?>(), Arg.Any<string?>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(ci =>
|
||||||
|
{
|
||||||
|
receivedToken = ci.ArgAt<string?>(1);
|
||||||
|
return new BrowseChildrenResult(children, Truncated: true, ContinuationToken: outboundToken);
|
||||||
|
});
|
||||||
|
|
||||||
|
_factory.Create("OpcUa", Arg.Any<IDictionary<string, string>>())
|
||||||
|
.Returns((IDataConnection)adapter);
|
||||||
|
|
||||||
|
var manager = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new DataConnectionManagerActor(_factory, _options, _healthCollector, null)));
|
||||||
|
manager.Tell(new CreateConnectionCommand(
|
||||||
|
"conn-paged", "OpcUa", new Dictionary<string, string>(), null, 3));
|
||||||
|
|
||||||
|
AwaitCondition(
|
||||||
|
() => _factory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create"),
|
||||||
|
TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
manager.Tell(new BrowseNodeCommand("conn-paged", ParentNodeId: "ns=2;s=Folder", ContinuationToken: inboundToken));
|
||||||
|
|
||||||
|
var reply = ExpectMsg<BrowseNodeResult>(TimeSpan.FromSeconds(3));
|
||||||
|
Assert.Null(reply.Failure);
|
||||||
|
// Inbound token reached the adapter verbatim.
|
||||||
|
Assert.Equal(inboundToken, receivedToken);
|
||||||
|
// Outbound token from the adapter is surfaced on the reply for the next page.
|
||||||
|
Assert.Equal(outboundToken, reply.ContinuationToken);
|
||||||
|
Assert.True(reply.Truncated);
|
||||||
|
Assert.Single(reply.Children);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user