615b487a77
Adds missing <summary>/<param> XML docs across 99 server, worker, and test files so CommentChecker reports zero issues (TreatWarningsAsErrors needs the analyzer clean). Bundles in WIP dashboard work: NavSection extraction, MainLayout/site.css/js styling alignment, and DashboardOptions/Auth tweaks.
1035 lines
46 KiB
C#
1035 lines
46 KiB
C#
using Grpc.Core;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.MxGateway.Contracts;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
|
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
|
using ZB.MOM.WW.MxGateway.Server.Security.Authentication;
|
|
using ZB.MOM.WW.MxGateway.Server.Security.Authorization;
|
|
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
|
using ZB.MOM.WW.MxGateway.Server.Workers;
|
|
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
|
|
|
|
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc;
|
|
|
|
/// <summary>
|
|
/// Tests for Server-021. <c>MxAccessGatewayService.ApplyConstraintsAsync</c> and
|
|
/// the <c>BulkConstraintPlan</c> / <c>ReadBulkConstraintPlan</c> /
|
|
/// <c>WriteBulkConstraintPlan</c> / <c>SubscribeBulkConstraintPlan</c> reply-merge
|
|
/// logic was previously exercised only with an allow-all enforcer, so denial
|
|
/// filtering, the no-allowed-items short-circuit, and the index-ordered
|
|
/// denied/allowed interleave were dead code at test time. The fixtures below
|
|
/// inject a <see cref="PredicateConstraintEnforcer"/> that denies a subset of
|
|
/// tags or handles, and assert the post-merge reply contents and that the
|
|
/// session manager is (or is not) invoked.
|
|
/// </summary>
|
|
public sealed class MxAccessGatewayServiceConstraintTests
|
|
{
|
|
private const string SessionId = "session-constraint";
|
|
|
|
// === SubscribeBulk family: AddItemBulk / SubscribeBulk / AdviseItemBulk ===
|
|
|
|
/// <summary>
|
|
/// <c>AddItemBulk</c> with a mix of allowed and denied tags must invoke the
|
|
/// worker once with only the allowed tags, then splice the denied entries
|
|
/// back into the reply at their original indices.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_AddItemBulk_WithMixedDenials_InterleavesDeniedAndAllowedInOriginalIndexOrder()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyTag = tag => tag == "Tank01.Locked" || tag == "Tank03.Secret",
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.AddItemBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
AddItemBulk = new BulkSubscribeReply
|
|
{
|
|
Results =
|
|
{
|
|
// Worker only sees the two allowed tags — Tank02.Open at original
|
|
// index 1 and Tank04.Public at original index 3.
|
|
new SubscribeResult { ServerHandle = 7, TagAddress = "Tank02.Open", ItemHandle = 102, WasSuccessful = true },
|
|
new SubscribeResult { ServerHandle = 7, TagAddress = "Tank04.Public", ItemHandle = 104, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateAddItemBulkRequest(7, ["Tank01.Locked", "Tank02.Open", "Tank03.Secret", "Tank04.Public"]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
// Worker saw only the allowed subset, in original order, with denied entries dropped.
|
|
AddItemBulkCommand forwardedCommand = sessionManager.LastWorkerCommand!.Command.AddItemBulk;
|
|
Assert.Equal(["Tank02.Open", "Tank04.Public"], forwardedCommand.TagAddresses);
|
|
// Final reply preserves the original 4-entry index order, with denied entries
|
|
// at index 0 and 2 and worker-allowed entries at index 1 and 3.
|
|
BulkSubscribeReply merged = reply.AddItemBulk;
|
|
Assert.Equal(4, merged.Results.Count);
|
|
Assert.False(merged.Results[0].WasSuccessful);
|
|
Assert.Equal("Tank01.Locked", merged.Results[0].TagAddress);
|
|
Assert.Contains("Tank01.Locked", merged.Results[0].ErrorMessage, StringComparison.Ordinal);
|
|
Assert.True(merged.Results[1].WasSuccessful);
|
|
Assert.Equal("Tank02.Open", merged.Results[1].TagAddress);
|
|
Assert.Equal(102, merged.Results[1].ItemHandle);
|
|
Assert.False(merged.Results[2].WasSuccessful);
|
|
Assert.Equal("Tank03.Secret", merged.Results[2].TagAddress);
|
|
Assert.True(merged.Results[3].WasSuccessful);
|
|
Assert.Equal("Tank04.Public", merged.Results[3].TagAddress);
|
|
Assert.Equal(104, merged.Results[3].ItemHandle);
|
|
// Both denied tags recorded.
|
|
Assert.Equal(2, enforcer.RecordedDenials.Count);
|
|
}
|
|
|
|
/// <summary>
|
|
/// <c>SubscribeBulk</c> when every tag is denied must short-circuit
|
|
/// <see cref="BulkConstraintPlan.HasAllowedItems"/> false, return the
|
|
/// denied-only reply, and never call the session manager.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_SubscribeBulk_WhenAllTagsDenied_DoesNotCallWorkerAndReturnsDeniedReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyTag = _ => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateSubscribeBulkRequest(7, ["A", "B", "C"]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Equal(3, reply.SubscribeBulk.Results.Count);
|
|
Assert.All(reply.SubscribeBulk.Results, r => Assert.False(r.WasSuccessful));
|
|
Assert.Equal(["A", "B", "C"], reply.SubscribeBulk.Results.Select(r => r.TagAddress));
|
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
|
}
|
|
|
|
/// <summary>
|
|
/// <c>AdviseItemBulk</c> takes handle inputs (not tags) and routes through
|
|
/// <c>FilterHandleBulkAsync</c> against <c>CheckReadHandleAsync</c>. Partial
|
|
/// denial must still produce a merged-by-index <c>BulkSubscribeReply</c>.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_AdviseItemBulk_WithMixedHandleDenials_MergesDeniedIntoReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyReadHandle = (_, itemHandle) => itemHandle == 502,
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.AdviseItemBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
AdviseItemBulk = new BulkSubscribeReply
|
|
{
|
|
Results =
|
|
{
|
|
new SubscribeResult { ServerHandle = 7, ItemHandle = 501, WasSuccessful = true },
|
|
new SubscribeResult { ServerHandle = 7, ItemHandle = 503, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateAdviseItemBulkRequest(7, [501, 502, 503]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
Assert.Equal([501, 503], sessionManager.LastWorkerCommand!.Command.AdviseItemBulk.ItemHandles);
|
|
BulkSubscribeReply merged = reply.AdviseItemBulk;
|
|
Assert.Equal(3, merged.Results.Count);
|
|
Assert.True(merged.Results[0].WasSuccessful);
|
|
Assert.Equal(501, merged.Results[0].ItemHandle);
|
|
Assert.False(merged.Results[1].WasSuccessful);
|
|
Assert.Equal(502, merged.Results[1].ItemHandle);
|
|
Assert.True(merged.Results[2].WasSuccessful);
|
|
Assert.Equal(503, merged.Results[2].ItemHandle);
|
|
}
|
|
|
|
/// <summary>
|
|
/// <c>SubscribeBulk</c> with an allow-all enforcer must leave the worker reply
|
|
/// unchanged — the constraint plan is null and no merge occurs. Regression
|
|
/// guard against accidentally engaging the merge path for the common case.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_SubscribeBulk_WithAllowAllEnforcer_PassesThroughUnchanged()
|
|
{
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
SubscribeBulk = new BulkSubscribeReply
|
|
{
|
|
Results =
|
|
{
|
|
new SubscribeResult { ServerHandle = 7, TagAddress = "A", ItemHandle = 1, WasSuccessful = true },
|
|
new SubscribeResult { ServerHandle = 7, TagAddress = "B", ItemHandle = 2, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateSubscribeBulkRequest(7, ["A", "B"]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
Assert.Equal(["A", "B"], sessionManager.LastWorkerCommand!.Command.SubscribeBulk.TagAddresses);
|
|
// Reply identical to worker reply — no synthetic denial rows added.
|
|
Assert.Equal(2, reply.SubscribeBulk.Results.Count);
|
|
Assert.All(reply.SubscribeBulk.Results, r => Assert.True(r.WasSuccessful));
|
|
}
|
|
|
|
// === ReadBulk family ===
|
|
|
|
/// <summary>
|
|
/// <c>ReadBulk</c> with a mix of allowed and denied tags merges denied entries
|
|
/// into the <c>BulkReadReply</c> in original-index order, distinguishable from
|
|
/// the SubscribeBulk family because the reply slot is
|
|
/// <c>BulkReadReply</c>, not <c>BulkSubscribeReply</c>.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_ReadBulk_WithMixedDenials_MergesDeniedBulkReadResults()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyTag = tag => tag == "Secret.Tag",
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.ReadBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
ReadBulk = new BulkReadReply
|
|
{
|
|
Results =
|
|
{
|
|
new BulkReadResult { ServerHandle = 7, TagAddress = "Public.A", WasSuccessful = true },
|
|
new BulkReadResult { ServerHandle = 7, TagAddress = "Public.B", WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateReadBulkRequest(7, ["Public.A", "Secret.Tag", "Public.B"]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
Assert.Equal(["Public.A", "Public.B"], sessionManager.LastWorkerCommand!.Command.ReadBulk.TagAddresses);
|
|
BulkReadReply merged = reply.ReadBulk;
|
|
Assert.Equal(3, merged.Results.Count);
|
|
Assert.True(merged.Results[0].WasSuccessful);
|
|
Assert.False(merged.Results[1].WasSuccessful);
|
|
Assert.Equal("Secret.Tag", merged.Results[1].TagAddress);
|
|
Assert.True(merged.Results[2].WasSuccessful);
|
|
}
|
|
|
|
/// <summary>
|
|
/// <c>ReadBulk</c> with all tags denied must short-circuit and produce a
|
|
/// denied-only <c>BulkReadReply</c> — verifying
|
|
/// <see cref="MxAccessGatewayService"/>'s <c>ReadBulkConstraintPlan</c>
|
|
/// <c>CreateDeniedReply</c> path.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_ReadBulk_WhenAllTagsDenied_ShortCircuitsWithDeniedOnlyReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyTag = _ => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateReadBulkRequest(7, ["X", "Y"]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Equal(2, reply.ReadBulk.Results.Count);
|
|
Assert.All(reply.ReadBulk.Results, r => Assert.False(r.WasSuccessful));
|
|
Assert.Equal(MxCommandKind.ReadBulk, reply.Kind);
|
|
}
|
|
|
|
// === WriteBulk family: WriteBulk / Write2Bulk / WriteSecuredBulk / WriteSecured2Bulk ===
|
|
|
|
/// <summary>
|
|
/// <c>WriteBulk</c> with one denied handle must drop that entry from the
|
|
/// forwarded command and splice a denied <c>BulkWriteResult</c> back in at
|
|
/// the original index.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteBulk_WithDeniedHandle_DropsEntryFromWorkerCallAndMergesDenialIntoReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyWriteHandle = (_, itemHandle) => itemHandle == 902,
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.WriteBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
WriteBulk = new BulkWriteReply
|
|
{
|
|
Results =
|
|
{
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 901, WasSuccessful = true },
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 903, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWriteBulkRequest(7, [901, 902, 903]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
// 902 dropped from forwarded entries; only 901 and 903 reach the worker.
|
|
WriteBulkCommand forwarded = sessionManager.LastWorkerCommand!.Command.WriteBulk;
|
|
Assert.Equal([901, 903], forwarded.Entries.Select(e => e.ItemHandle));
|
|
BulkWriteReply merged = reply.WriteBulk;
|
|
Assert.Equal(3, merged.Results.Count);
|
|
Assert.True(merged.Results[0].WasSuccessful);
|
|
Assert.Equal(901, merged.Results[0].ItemHandle);
|
|
Assert.False(merged.Results[1].WasSuccessful);
|
|
Assert.Equal(902, merged.Results[1].ItemHandle);
|
|
Assert.True(merged.Results[2].WasSuccessful);
|
|
Assert.Equal(903, merged.Results[2].ItemHandle);
|
|
}
|
|
|
|
/// <summary>
|
|
/// <c>WriteSecuredBulk</c> exercises a different <c>ReplaceWriteBulkEntries</c>
|
|
/// switch arm than plain <c>WriteBulk</c>. The merge logic is shared, so a
|
|
/// full denial here is enough to prove the secured-bulk routing.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteSecuredBulk_WhenAllHandlesDenied_ShortCircuitsWithDeniedOnlyReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyWriteHandle = (_, _) => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWriteSecuredBulkRequest(7, [10, 11]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Equal(MxCommandKind.WriteSecuredBulk, reply.Kind);
|
|
Assert.Equal(2, reply.WriteSecuredBulk.Results.Count);
|
|
Assert.All(reply.WriteSecuredBulk.Results, r => Assert.False(r.WasSuccessful));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests-020: <c>Write2Bulk</c> takes the third <c>GetPayload</c>/<c>SetPayload</c>
|
|
/// switch arm in <c>WriteBulkConstraintPlan</c>. The merge logic is shared with
|
|
/// <c>WriteBulk</c>, but a full denial through the <c>CreateDeniedReply</c> path
|
|
/// proves the <c>Write2Bulk</c> arm of the per-kind <c>SetPayload</c> switch fires
|
|
/// (and not, say, <c>WriteBulk</c> by mistake) — guarding against a refactor that
|
|
/// drops or misroutes the <c>Write2Bulk</c> case.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_Write2Bulk_WhenAllHandlesDenied_ShortCircuitsWithDeniedOnlyReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyWriteHandle = (_, _) => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWrite2BulkRequest(7, [10, 11]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Equal(MxCommandKind.Write2Bulk, reply.Kind);
|
|
Assert.Equal(2, reply.Write2Bulk.Results.Count);
|
|
Assert.All(reply.Write2Bulk.Results, r => Assert.False(r.WasSuccessful));
|
|
// Sibling reply slots must remain empty — pin the SetPayload arm fired
|
|
// for Write2Bulk and not for one of the other three Write*Bulk kinds.
|
|
Assert.Empty(reply.WriteBulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
Assert.Empty(reply.WriteSecuredBulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
Assert.Empty(reply.WriteSecured2Bulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests-020: <c>WriteSecured2Bulk</c> takes the fourth <c>GetPayload</c>/<c>SetPayload</c>
|
|
/// switch arm in <c>WriteBulkConstraintPlan</c>. Same reasoning as
|
|
/// <c>Write2Bulk</c> — assert the <c>WriteSecured2Bulk</c> reply slot is populated
|
|
/// to prove that arm of the switch fires.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteSecured2Bulk_WhenAllHandlesDenied_ShortCircuitsWithDeniedOnlyReply()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyWriteHandle = (_, _) => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWriteSecured2BulkRequest(7, [10, 11]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Equal(MxCommandKind.WriteSecured2Bulk, reply.Kind);
|
|
Assert.Equal(2, reply.WriteSecured2Bulk.Results.Count);
|
|
Assert.All(reply.WriteSecured2Bulk.Results, r => Assert.False(r.WasSuccessful));
|
|
// Sibling reply slots must remain empty — pin the SetPayload arm fired
|
|
// for WriteSecured2Bulk and not for one of the other three Write*Bulk kinds.
|
|
Assert.Empty(reply.WriteBulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
Assert.Empty(reply.Write2Bulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
Assert.Empty(reply.WriteSecuredBulk?.Results ?? new Google.Protobuf.Collections.RepeatedField<BulkWriteResult>());
|
|
}
|
|
|
|
// === Worker reply-count divergence (Tests-024) ===
|
|
|
|
/// <summary>
|
|
/// Tests-024: <c>WriteBulkConstraintPlan.MergeDeniedInto</c> dequeues from
|
|
/// <c>allowedResults</c> per non-denied slot via <c>Queue.TryDequeue</c>,
|
|
/// which silently returns <c>false</c> when the queue is empty. Pin the
|
|
/// observable behaviour when the worker returns FEWER allowed results than
|
|
/// the gateway forwarded: the merged reply is truncated — denied entries
|
|
/// keep their slots, but the trailing allowed slot for which no worker
|
|
/// result arrived is dropped (no synthetic failure result is fabricated).
|
|
/// This fixture makes that "silent truncate" behaviour explicit so a future
|
|
/// change either fills the gap with a synthetic failure or fails this test.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteBulk_WhenWorkerReturnsFewerResultsThanAllowed_MergedReplyIsTruncated()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyWriteHandle = (_, itemHandle) => itemHandle == 902,
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
// Gateway forwards 2 allowed handles (901, 903) but the worker returns only
|
|
// 1 result. The merge logic should keep denied entry 902 at index 1, place
|
|
// the single worker result at index 0, and leave index 2 empty (truncate).
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.WriteBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
WriteBulk = new BulkWriteReply
|
|
{
|
|
Results =
|
|
{
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 901, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWriteBulkRequest(7, [901, 902, 903]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
BulkWriteReply merged = reply.WriteBulk;
|
|
// Current behaviour: the merged reply is shorter than OriginalCount when
|
|
// the worker under-supplies. Two slots survive — the worker result at
|
|
// index 0 and the denied entry at index 1 — and the trailing slot is
|
|
// silently dropped via Queue.TryDequeue returning false.
|
|
Assert.Equal(2, merged.Results.Count);
|
|
Assert.True(merged.Results[0].WasSuccessful);
|
|
Assert.Equal(901, merged.Results[0].ItemHandle);
|
|
Assert.False(merged.Results[1].WasSuccessful);
|
|
Assert.Equal(902, merged.Results[1].ItemHandle);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests-024: when the worker returns MORE allowed results than the
|
|
/// gateway forwarded, the extras must be silently ignored — the merged
|
|
/// reply length stays at <c>OriginalCount</c>. This pins the
|
|
/// <c>for index < OriginalCount</c> loop bound so a regression that
|
|
/// accidentally surfaces extras as trailing results is caught.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteBulk_WhenWorkerReturnsExtraResults_IgnoresExtras()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyWriteHandle = (_, itemHandle) => itemHandle == 902,
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
// Gateway forwards 2 allowed handles (901, 903) but the worker returns 4.
|
|
sessionManager.InvokeReply = new WorkerCommandReply
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.WriteBulk,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
WriteBulk = new BulkWriteReply
|
|
{
|
|
Results =
|
|
{
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 901, WasSuccessful = true },
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 903, WasSuccessful = true },
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 999, WasSuccessful = true },
|
|
new BulkWriteResult { ServerHandle = 7, ItemHandle = 1000, WasSuccessful = true },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
MxCommandReply reply = await service.Invoke(
|
|
CreateWriteBulkRequest(7, [901, 902, 903]),
|
|
new TestServerCallContext());
|
|
|
|
Assert.Equal(1, sessionManager.InvokeCount);
|
|
BulkWriteReply merged = reply.WriteBulk;
|
|
// Merged reply length stays at OriginalCount (3); the two extra worker
|
|
// results (item handles 999, 1000) are silently discarded by the
|
|
// OriginalCount-bounded loop.
|
|
Assert.Equal(3, merged.Results.Count);
|
|
Assert.Equal(901, merged.Results[0].ItemHandle);
|
|
Assert.True(merged.Results[0].WasSuccessful);
|
|
Assert.Equal(902, merged.Results[1].ItemHandle);
|
|
Assert.False(merged.Results[1].WasSuccessful);
|
|
Assert.Equal(903, merged.Results[2].ItemHandle);
|
|
Assert.True(merged.Results[2].WasSuccessful);
|
|
Assert.DoesNotContain(merged.Results, r => r.ItemHandle == 999);
|
|
Assert.DoesNotContain(merged.Results, r => r.ItemHandle == 1000);
|
|
}
|
|
|
|
// === Unary write-handle enforcement (EnforceWriteHandleAsync) ===
|
|
|
|
/// <summary>
|
|
/// Unary <c>Write</c> against a denied (server, item) handle must surface
|
|
/// <see cref="StatusCode.PermissionDenied"/> via <c>EnforceWriteHandleAsync</c>
|
|
/// and never reach the session manager.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_Write_WithDeniedHandle_ThrowsPermissionDeniedAndDoesNotCallWorker()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyWriteHandle = (serverHandle, itemHandle) => serverHandle == 7 && itemHandle == 42,
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
|
async () => await service.Invoke(
|
|
CreateWriteRequest(serverHandle: 7, itemHandle: 42),
|
|
new TestServerCallContext()));
|
|
|
|
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Single(enforcer.RecordedDenials);
|
|
Assert.Equal("42", enforcer.RecordedDenials[0].Target);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Unary <c>WriteSecured</c> against a denied handle takes the same enforce path
|
|
/// and rejects identically — proving the four-arm switch in
|
|
/// <c>ApplyConstraintsAsync</c> (Write/Write2/WriteSecured/WriteSecured2) is
|
|
/// reachable for at least one of the secured kinds.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_WriteSecured_WithDeniedHandle_ThrowsPermissionDenied()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new() { DenyWriteHandle = (_, _) => true };
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
|
async () => await service.Invoke(
|
|
CreateWriteSecuredRequest(serverHandle: 7, itemHandle: 42),
|
|
new TestServerCallContext()));
|
|
|
|
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
}
|
|
|
|
// === Unary read-tag enforcement (EnforceReadTagAsync via AddItem) ===
|
|
|
|
/// <summary>
|
|
/// Unary <c>AddItem</c> against a denied tag must surface
|
|
/// <see cref="StatusCode.PermissionDenied"/> via <c>EnforceReadTagAsync</c>
|
|
/// and never reach the session manager.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Invoke_AddItem_WithDeniedTag_ThrowsPermissionDeniedAndDoesNotCallWorker()
|
|
{
|
|
PredicateConstraintEnforcer enforcer = new()
|
|
{
|
|
DenyTag = tag => tag == "Secret.Tag",
|
|
};
|
|
FakeSessionManager sessionManager = CreateSessionManagerWithSeed();
|
|
MxAccessGatewayService service = CreateService(sessionManager, enforcer);
|
|
|
|
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
|
async () => await service.Invoke(
|
|
CreateAddItemRequest(serverHandle: 7, tagAddress: "Secret.Tag"),
|
|
new TestServerCallContext()));
|
|
|
|
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
|
|
Assert.Equal(0, sessionManager.InvokeCount);
|
|
Assert.Single(enforcer.RecordedDenials);
|
|
Assert.Equal("Secret.Tag", enforcer.RecordedDenials[0].Target);
|
|
}
|
|
|
|
// === Helpers ===
|
|
|
|
private static MxAccessGatewayService CreateService(
|
|
FakeSessionManager sessionManager,
|
|
IConstraintEnforcer? constraintEnforcer = null)
|
|
{
|
|
return new MxAccessGatewayService(
|
|
sessionManager,
|
|
new GatewayRequestIdentityAccessor(),
|
|
constraintEnforcer ?? new AllowAllConstraintEnforcer(),
|
|
new MxAccessGrpcRequestValidator(),
|
|
new MxAccessGrpcMapper(),
|
|
new FakeEventStreamService(sessionManager),
|
|
new GatewayMetrics(),
|
|
NullLogger<MxAccessGatewayService>.Instance,
|
|
new FakeGatewayAlarmService());
|
|
}
|
|
|
|
private static FakeSessionManager CreateSessionManagerWithSeed()
|
|
{
|
|
FakeSessionManager sessionManager = new() { ResolveOnlySeededSessions = true };
|
|
sessionManager.SeedSession(CreateSession(SessionId));
|
|
return sessionManager;
|
|
}
|
|
|
|
private static GatewaySession CreateSession(string sessionId)
|
|
{
|
|
GatewaySession session = new(
|
|
sessionId,
|
|
GatewayContractInfo.DefaultBackendName,
|
|
"pipe",
|
|
"nonce",
|
|
"Operator Key",
|
|
"operator-session",
|
|
"client-correlation",
|
|
TimeSpan.FromSeconds(7),
|
|
TimeSpan.FromSeconds(30),
|
|
TimeSpan.FromSeconds(10),
|
|
DateTimeOffset.UtcNow);
|
|
session.AttachWorkerClient(new FakeWorkerClient());
|
|
session.MarkReady();
|
|
return session;
|
|
}
|
|
|
|
private static MxCommandRequest CreateAddItemBulkRequest(int serverHandle, IReadOnlyList<string> tags)
|
|
{
|
|
AddItemBulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
cmd.TagAddresses.Add(tags);
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateSubscribeBulkRequest(int serverHandle, IReadOnlyList<string> tags)
|
|
{
|
|
SubscribeBulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
cmd.TagAddresses.Add(tags);
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateAdviseItemBulkRequest(int serverHandle, IReadOnlyList<int> itemHandles)
|
|
{
|
|
AdviseItemBulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
cmd.ItemHandles.Add(itemHandles);
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.AdviseItemBulk, AdviseItemBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateReadBulkRequest(int serverHandle, IReadOnlyList<string> tags)
|
|
{
|
|
ReadBulkCommand cmd = new() { ServerHandle = serverHandle, TimeoutMs = 1000 };
|
|
cmd.TagAddresses.Add(tags);
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.ReadBulk, ReadBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWriteBulkRequest(int serverHandle, IReadOnlyList<int> itemHandles)
|
|
{
|
|
WriteBulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
foreach (int handle in itemHandles)
|
|
{
|
|
cmd.Entries.Add(new WriteBulkEntry { ItemHandle = handle, Value = new MxValue { StringValue = "v" } });
|
|
}
|
|
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.WriteBulk, WriteBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWriteSecuredBulkRequest(int serverHandle, IReadOnlyList<int> itemHandles)
|
|
{
|
|
WriteSecuredBulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
foreach (int handle in itemHandles)
|
|
{
|
|
cmd.Entries.Add(new WriteSecuredBulkEntry
|
|
{
|
|
ItemHandle = handle,
|
|
CurrentUserId = 1,
|
|
VerifierUserId = 2,
|
|
Value = new MxValue { StringValue = "v" },
|
|
});
|
|
}
|
|
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.WriteSecuredBulk, WriteSecuredBulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWrite2BulkRequest(int serverHandle, IReadOnlyList<int> itemHandles)
|
|
{
|
|
Write2BulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
foreach (int handle in itemHandles)
|
|
{
|
|
cmd.Entries.Add(new Write2BulkEntry
|
|
{
|
|
ItemHandle = handle,
|
|
Value = new MxValue { StringValue = "v" },
|
|
TimestampValue = new MxValue { Int64Value = 1234567890L },
|
|
});
|
|
}
|
|
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.Write2Bulk, Write2Bulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWriteSecured2BulkRequest(int serverHandle, IReadOnlyList<int> itemHandles)
|
|
{
|
|
WriteSecured2BulkCommand cmd = new() { ServerHandle = serverHandle };
|
|
foreach (int handle in itemHandles)
|
|
{
|
|
cmd.Entries.Add(new WriteSecured2BulkEntry
|
|
{
|
|
ItemHandle = handle,
|
|
CurrentUserId = 1,
|
|
VerifierUserId = 2,
|
|
Value = new MxValue { StringValue = "v" },
|
|
TimestampValue = new MxValue { Int64Value = 1234567890L },
|
|
});
|
|
}
|
|
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.WriteSecured2Bulk, WriteSecured2Bulk = cmd },
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWriteRequest(int serverHandle, int itemHandle)
|
|
{
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write,
|
|
Write = new WriteCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
Value = new MxValue { StringValue = "v" },
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateWriteSecuredRequest(int serverHandle, int itemHandle)
|
|
{
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteSecured,
|
|
WriteSecured = new WriteSecuredCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
CurrentUserId = 1,
|
|
VerifierUserId = 2,
|
|
Value = new MxValue { StringValue = "v" },
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
private static MxCommandRequest CreateAddItemRequest(int serverHandle, string tagAddress)
|
|
{
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = SessionId,
|
|
Command = new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AddItem,
|
|
AddItem = new AddItemCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemDefinition = tagAddress,
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
// FakeSessionManager / FakeEventStreamService / FakeWorkerClient mirror the
|
|
// implementations in MxAccessGatewayServiceTests; the duplication is intentional
|
|
// so the constraint tests are self-contained and changes to the existing fakes
|
|
// don't accidentally couple the two suites.
|
|
private sealed class FakeSessionManager : ISessionManager
|
|
{
|
|
private readonly Dictionary<string, GatewaySession> seededSessions = new(StringComparer.Ordinal);
|
|
|
|
/// <summary>Gets a value indicating whether only seeded sessions should be resolved.</summary>
|
|
public bool ResolveOnlySeededSessions { get; init; }
|
|
|
|
/// <summary>Gets the last worker command that was invoked.</summary>
|
|
public WorkerCommand? LastWorkerCommand { get; private set; }
|
|
|
|
/// <summary>Gets the count of invoke calls made.</summary>
|
|
public int InvokeCount { get; private set; }
|
|
|
|
/// <summary>Gets or sets the default invoke reply to return.</summary>
|
|
public WorkerCommandReply InvokeReply { get; set; } = new()
|
|
{
|
|
Reply = new MxCommandReply
|
|
{
|
|
SessionId = SessionId,
|
|
Kind = MxCommandKind.Ping,
|
|
ProtocolStatus = MxAccessGrpcMapper.Ok(),
|
|
},
|
|
};
|
|
|
|
/// <summary>Gets the collection of events to stream.</summary>
|
|
public List<WorkerEvent> Events { get; } = [];
|
|
|
|
/// <summary>Seeds a test session into the fake manager.</summary>
|
|
/// <param name="session">The session to seed.</param>
|
|
public void SeedSession(GatewaySession session) => seededSessions[session.SessionId] = session;
|
|
|
|
/// <summary>Opens a test session asynchronously.</summary>
|
|
/// <param name="request">The session open request.</param>
|
|
/// <param name="clientIdentity">The client identity, if any.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<GatewaySession> OpenSessionAsync(
|
|
SessionOpenRequest request,
|
|
string? clientIdentity,
|
|
CancellationToken cancellationToken) =>
|
|
Task.FromResult(seededSessions.Values.First());
|
|
|
|
/// <summary>Tries to get a test session by identifier.</summary>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="session">The session, if found.</param>
|
|
public bool TryGetSession(string sessionId, out GatewaySession session)
|
|
{
|
|
if (seededSessions.TryGetValue(sessionId, out GatewaySession? seeded))
|
|
{
|
|
session = seeded;
|
|
return true;
|
|
}
|
|
|
|
if (ResolveOnlySeededSessions)
|
|
{
|
|
session = null!;
|
|
return false;
|
|
}
|
|
|
|
session = CreateFallbackSession(sessionId);
|
|
return true;
|
|
}
|
|
|
|
/// <summary>Invokes a worker command and returns the reply asynchronously.</summary>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="command">The worker command.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<WorkerCommandReply> InvokeAsync(
|
|
string sessionId,
|
|
WorkerCommand command,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
InvokeCount++;
|
|
LastWorkerCommand = command;
|
|
return Task.FromResult(InvokeReply);
|
|
}
|
|
|
|
/// <summary>Reads events from the session asynchronously.</summary>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
|
string sessionId,
|
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
foreach (WorkerEvent ev in Events)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
await Task.Yield();
|
|
yield return ev;
|
|
}
|
|
}
|
|
|
|
/// <summary>Closes a test session asynchronously.</summary>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<SessionCloseResult> CloseSessionAsync(
|
|
string sessionId,
|
|
CancellationToken cancellationToken) =>
|
|
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
|
|
|
/// <summary>Kills a worker process asynchronously.</summary>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="reason">The reason for killing the worker.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<SessionCloseResult> KillWorkerAsync(
|
|
string sessionId,
|
|
string reason,
|
|
CancellationToken cancellationToken) =>
|
|
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
|
|
|
/// <summary>Closes expired session leases asynchronously.</summary>
|
|
/// <param name="now">The current time to check against.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<int> CloseExpiredLeasesAsync(
|
|
DateTimeOffset now,
|
|
CancellationToken cancellationToken) => Task.FromResult(0);
|
|
|
|
/// <summary>Shuts down the test session manager asynchronously.</summary>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
|
|
private static GatewaySession CreateFallbackSession(string sessionId)
|
|
{
|
|
GatewaySession session = new(
|
|
sessionId,
|
|
GatewayContractInfo.DefaultBackendName,
|
|
"pipe",
|
|
"nonce",
|
|
"Operator Key",
|
|
"operator-session",
|
|
"client-correlation",
|
|
TimeSpan.FromSeconds(7),
|
|
TimeSpan.FromSeconds(30),
|
|
TimeSpan.FromSeconds(10),
|
|
DateTimeOffset.UtcNow);
|
|
session.AttachWorkerClient(new FakeWorkerClient());
|
|
session.MarkReady();
|
|
return session;
|
|
}
|
|
}
|
|
|
|
private sealed class FakeEventStreamService(FakeSessionManager sessionManager) : IEventStreamService
|
|
{
|
|
/// <summary>Streams events for the test session asynchronously.</summary>
|
|
/// <param name="request">The stream events request.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
|
StreamEventsRequest request,
|
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
foreach (WorkerEvent ev in sessionManager.Events)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
await Task.Yield();
|
|
yield return ev.Event;
|
|
}
|
|
}
|
|
}
|
|
|
|
private sealed class FakeWorkerClient : IWorkerClient
|
|
{
|
|
/// <summary>Gets the test session identifier.</summary>
|
|
public string SessionId { get; } = MxAccessGatewayServiceConstraintTests.SessionId;
|
|
|
|
/// <summary>Gets the test worker process identifier.</summary>
|
|
public int? ProcessId { get; } = 1234;
|
|
|
|
/// <summary>Gets the test worker client state.</summary>
|
|
public WorkerClientState State { get; } = WorkerClientState.Ready;
|
|
|
|
/// <summary>Gets the last recorded heartbeat time.</summary>
|
|
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
|
|
|
/// <summary>Starts the test worker client asynchronously.</summary>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
|
|
/// <summary>Invokes a command on the test worker asynchronously.</summary>
|
|
/// <param name="command">The worker command.</param>
|
|
/// <param name="timeout">Maximum time to wait for completion.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task<WorkerCommandReply> InvokeAsync(
|
|
WorkerCommand command,
|
|
TimeSpan timeout,
|
|
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
|
|
|
|
/// <summary>Reads events from the test worker asynchronously.</summary>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
await Task.CompletedTask;
|
|
yield break;
|
|
}
|
|
|
|
/// <summary>Shuts down the test worker client asynchronously.</summary>
|
|
/// <param name="timeout">Maximum time to wait for completion.</param>
|
|
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
|
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
|
|
/// <summary>Kills the test worker process.</summary>
|
|
/// <param name="reason">The reason for killing the worker.</param>
|
|
public void Kill(string reason)
|
|
{
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|
|
}
|