Compare commits

..

38 Commits

Author SHA1 Message Date
Joseph Doherty 8df5ab381a docs(stillpending): mark §8 items resolved by section8-completion branch 2026-06-16 17:42:42 -04:00
Joseph Doherty 6f21d926d7 test(java-cli): cover galaxy-discover/galaxy-watch over in-process harness 2026-06-16 17:32:08 -04:00
Joseph Doherty aa8ae6613b refactor(java-cli): drop dead no-arg GalaxyCommand ctors (Task 3 review) 2026-06-16 17:28:43 -04:00
Joseph Doherty 44d676aede test(server): restore entireProcessTree kill assertion in WorkerProcessLauncherTests (Task 9 review) 2026-06-16 17:22:46 -04:00
Joseph Doherty e2b1a6686a feat(java-cli): inject GalaxyClientFactory seam (behavior-preserving default) 2026-06-16 17:18:47 -04:00
Joseph Doherty 01bdb484de refactor(tests): consolidate FakeWorkerProcess onto TestSupport canonical 2026-06-16 17:14:03 -04:00
Joseph Doherty 3bb4d5a082 test(java-cli): cover stream-events over in-process harness 2026-06-16 17:09:44 -04:00
Joseph Doherty 4ab3bd55e5 fix(server): single-clock poll loop + drop dead sync GetReadyWorkerClient (Task 8 review) 2026-06-16 17:04:18 -04:00
Joseph Doherty 70d2842c16 test(java-cli): add in-process gRPC harness fixture 2026-06-16 16:55:48 -04:00
Joseph Doherty 4966ef3359 feat(server): bounded worker-ready wait in GatewaySession (default off) 2026-06-16 16:48:02 -04:00
Joseph Doherty 0efa7d8cca test(java-cli): cover secured/secured2/bench bulk + close-session 2026-06-16 16:42:28 -04:00
Joseph Doherty ea17528767 feat(server): add MxGateway:Sessions:WorkerReadyWaitTimeoutMs (default off)
Adds WorkerReadyWaitTimeoutMs to SessionOptions (default 0 = disabled),
validates >= 0 in GatewayOptionsValidator, documents it in
GatewayConfiguration.md, and adds validator + default-value tests.
No wait/poll logic is implemented here (that is Task 8).
2026-06-16 16:38:31 -04:00
Joseph Doherty 1cfad83c06 test(java-cli): cover read-bulk/write-bulk/write2-bulk round trips 2026-06-16 16:33:42 -04:00
Joseph Doherty 76ffd5c9a3 docs: implementation plan for stillpending §8 completion (9 tasks) 2026-06-16 16:29:49 -04:00
Joseph Doherty 6030bfa18e docs: design for stillpending §8 completion (Approach C)
Also codify targeted-test-per-task rule in CLAUDE.md Source Update Workflow.
2026-06-16 16:19:49 -04:00
Joseph Doherty 82755a3623 docs(stillpending): reflect session-resilience merge (multi-subscriber resolved, reconnect core) + disable-login flag 2026-06-16 15:53:11 -04:00
Joseph Doherty 121ab7e263 fix(galaxy): include undeployed areas in browse + re-root orphaned objects
The hierarchy query returned deployed objects only (deployed_package_id <> 0), so
areas whose containing area is undeployed were orphaned and hidden from /browse —
on wonder, only the lone deployed root area surfaced. Include category-13 Area
objects regardless of deployment, and in GalaxyHierarchyIndex re-root any object
whose parent is absent from the set (e.g. a deleted container area) so nothing
disappears under a phantom parent id.
2026-06-16 12:49:03 -04:00
Joseph Doherty ca443b1903 test+docs(dashboard): assert hub policy under disable-login; correct warning/doc wording 2026-06-16 09:30:13 -04:00
Joseph Doherty 7a2da4d8b6 chore(plan): mark dashboard disable-login tasks complete 2026-06-16 09:23:50 -04:00
Joseph Doherty a0b21ca225 refactor(dashboard): consistent effective-user in disable-login warning; doc the config read 2026-06-16 09:23:13 -04:00
Joseph Doherty 3690e4c2ca feat(dashboard): swap to auto-login handler when DisableLogin is set 2026-06-16 09:14:27 -04:00
Joseph Doherty 1d652b24c6 refactor(dashboard): normalize auto-login user in ctor; clarify claim-shape doc; add custom-user test 2026-06-16 08:23:14 -04:00
Joseph Doherty ee1423db7a docs: document dashboard DisableLogin / AutoLoginUser dev flag 2026-06-16 08:16:45 -04:00
Joseph Doherty 4993057ed5 feat(dashboard): add auto-login auth handler for DisableLogin mode 2026-06-16 08:14:51 -04:00
Joseph Doherty 073252d7a6 feat(dashboard): add DisableLogin + AutoLoginUser options (default off) 2026-06-16 08:11:10 -04:00
Joseph Doherty a894717319 docs(plan): dashboard disable-login implementation plan + tasks 2026-06-16 07:54:43 -04:00
Joseph Doherty 0856cd4f93 docs: dashboard disable-login design + save session-resilience tasklist snapshot 2026-06-16 07:47:31 -04:00
Joseph Doherty c446bef64f chore(plan): mark session-resilience tasks 1-12 complete 2026-06-16 07:29:17 -04:00
Joseph Doherty c7a7cd1e5e fix(sessions): tidy replay filter/comment; zero OldestAvailableSequence when no gap
- EventStreamService: remove dead per-item sequence guard in the replay
  loop (RegisterWithReplay already returns only events > afterSequence)
  and correct the comment that falsely claimed a "per-item constraint
  filter" is applied; the event stream has no per-event constraint
  filtering today.
- SessionEventDistributor.RegisterWithReplay: set oldestAvailableSequence=0
  when gap==false so the implementation matches the documented contract
  (OldestAvailableSequence is meaningful only when Gap is true).
  Update the two RegisterWithReplay tests that asserted the old non-zero
  value in the no-gap path.
- RegisterSubscriber: remove stray blank line at method entry.
- SessionEventDistributorTests: add RegisterWithReplay_AfterDispose_
  ThrowsObjectDisposedException to pin nested-lock disposal behavior.
2026-06-16 07:28:37 -04:00
Joseph Doherty 36ab8d15f1 feat(sessions): replay-on-reconnect with ReplayGap sentinel 2026-06-16 07:22:19 -04:00
Joseph Doherty 042f5e3d82 fix(sessions): expose DetachGraceSeconds in effective-config; single clock; close reconnect-vs-sweep race
- EffectiveSessionConfiguration: add DetachGraceSeconds field; GatewayConfigurationProvider
  forwards value.Sessions.DetachGraceSeconds (blocker fix).
- GatewaySession.InvokeAsync and ReadEventsAsync: switch TouchClientActivity calls from
  DateTimeOffset.UtcNow to _eventStreaming.TimeProvider.GetUtcNow() so Task 12 fake-clock
  control works end-to-end (split-clock fix).
- TOCTOU fix: add TryBeginCloseIfExpired(now, out alreadyClosing) to GatewaySession that
  re-checks IsLeaseExpiredCore/IsDetachGraceExpiredCore AND _activeEventSubscriberCount==0
  under _syncRoot before transitioning to Closing; CloseExpiredLeasesAsync calls it before
  CloseSessionCoreAsync so a reattach that wins the race leaves the session Ready/usable.
- Minors: lease-expiry-takes-precedence comment in CloseExpiredLeasesAsync; TOCTOU comment
  block; sweep-cycle latency note added to SessionOptions.DetachGraceSeconds XML doc and to
  GatewayConfiguration.md DetachGraceSeconds row.
- New tests: TryBeginCloseIfExpired_ReattachedSubscriberWinsRace_DeclinesClose (GatewaySession),
  CloseExpiredLeasesAsync_DoesNotCloseSessionThatReattachedBeforeSweepCloses (SessionManager),
  plus IsLeaseExpiredCore/IsDetachGraceExpiredCore private helpers used by the guard.
2026-06-16 07:11:59 -04:00
Joseph Doherty db95f8644f feat(sessions): detach-grace retention window for reconnect 2026-06-16 06:15:46 -04:00
Joseph Doherty 85e4334bb7 docs(contracts): clarify ReplayGap drain-exclusion and resume-boundary semantics
Add two comment-only clarifications to mxaccess_gateway.proto (no field/number changes):
1. MxEvent.replay_gap: states the sentinel is ONLY ever set on StreamEvents events
   and is ALWAYS unset on DrainEventsReply events, preventing Task 12 from
   accidentally emitting it on the drain path and removing any client ambiguity.
2. ReplayGap.oldest_available_sequence: clarifies that the value IS retained and
   replayable, and that a client resumes gap-free by setting
   after_worker_sequence = oldest_available_sequence - 1 in the next
   StreamEventsRequest (receiving events starting at oldest_available_sequence).
Regenerated Generated/MxaccessGateway.cs (comment-only XML-doc change).
2026-06-16 05:09:47 -04:00
Joseph Doherty 9beb67c1e9 feat(contracts): add ReplayGap signal to MxEvent for reconnect replay 2026-06-16 05:04:17 -04:00
Joseph Doherty 056bb39a4d test(gateway): deterministic multi-subscriber test sync + cap-rejection specificity
Replace Task.Delay(100) subscriber-attachment races with WaitForSubscriberCountAsync,
a polling gate on GatewaySession.ActiveEventSubscriberCount so Advise and event fan-out
cannot proceed until all subscribers are confirmed registered.

Fix WaitForMessageCountAsync to honor a single CancellationTokenSource deadline across
the poll loop rather than resetting the timeout on each intermediate wakeup.

Add ordering comment in the cancellation test explaining why stream1Task must be awaited
before AllowNextEvent to guarantee sub1 is unregistered before the 2nd event is fanned.

Assert capException.Status.Detail contains "maximum" in the cap test to distinguish
EventSubscriberLimitReached (AllowMultiple=true cap) from EventSubscriberAlreadyActive
(single-subscriber rejection) — both map to ResourceExhausted.

Extract shared ConfigureCommandReply helper and move FakeWorkerProcess to TestSupport/
so both fake-worker test classes reference one definition.
2026-06-15 16:34:12 -04:00
Joseph Doherty 9dd97a27f1 test(gateway): end-to-end multi-subscriber fan-out via fake worker
Adds GatewayEndToEndMultiSubscriberTests covering three scenarios
through the real gRPC StreamEvents path with AllowMultipleEventSubscribers=true:
- Fan-out: two concurrent StreamEvents RPCs both receive every event the fake
  worker emits, in the same order (WorkerSequence matches, values indexed).
- Independent cancellation: cancelling one subscriber's stream leaves the other
  receiving subsequent events; the session stays usable.
- Cap enforcement: with MaxEventSubscribersPerSession=2 a third concurrent
  StreamEvents is rejected with gRPC ResourceExhausted while the first two
  keep streaming.

Extends RecordingServerStreamWriter<T> with WaitForMessageCountAsync to
allow deterministic bounded-timeout awaits for an N-message count without
fixed sleeps.
2026-06-15 16:09:58 -04:00
Joseph Doherty 281e00b300 refactor(sessions): derive subscriber mode from session config; close Task 8 review nits
Remove the per-call allowMultipleSubscribers param from AttachEventSubscriber and
derive the mode internally from _eventStreaming.AllowMultipleEventSubscribers — the
same source SessionEventDistributor uses for singleSubscriberMode — so the two can
never structurally diverge. The maxSubscribers cap param is kept because
MaxEventSubscribersPerSession lives in SessionOptions, which the session does not hold
directly (only EventOptions flows through SessionEventStreaming).

Other nits:
- SubscriberCount XML doc clarifies it includes internal subscribers and differs from
  GatewaySession.ActiveEventSubscriberCount (external/gRPC only).
- SingleSubscriberMode_LoneExternalOverflow test: add Assert.Equal(1, observedSet) guard
  before the value assertion so the test cannot pass vacuously if the handler never fired.
- GatewayOptionsValidator.ValidateSessions: add explanatory code comment documenting why
  !AllowMultipleEventSubscribers && MaxEventSubscribersPerSession > 1 is NOT rejected as
  a hard error (the default config ships with this combination; the cap is simply unused
  in single-subscriber mode, not a behavior bug).
- GatewaySession.DetachEventSubscriber: add Debug.Assert before the clamp so a genuine
  double-decrement surfaces in debug builds.
2026-06-15 15:53:27 -04:00
Joseph Doherty ac42783e36 feat(sessions): multi-subscriber cap enforcement + mode-gated FailFast 2026-06-15 15:32:08 -04:00
54 changed files with 5747 additions and 633 deletions
+3 -1
View File
@@ -85,6 +85,8 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1
When source code changes, build and test the affected component before reporting work done. If the change crosses component boundaries, build each affected component — don't rely on a single top-level build:
**Run targeted tests per task, never the full suite each time.** When executing a plan task-by-task, run only the tests that exercise the code that task touched (`dotnet test --filter "FullyQualifiedName~<TestClass>"`, or the per-task test named in the plan). The full gateway suite is slow and leaves orphaned testhost processes — run it at most once per phase (after a related batch of tasks lands), not after every task.
| Changed area | Required verification |
|---|---|
| Contracts or `.proto` files | regenerate generated code, then build gateway, worker, and every generated client touched by the contract |
@@ -116,7 +118,7 @@ External analysis sources referenced by design docs:
Gateway gRPC clients authenticate with an API key in metadata: `authorization: Bearer mxgw_<key-id>_<secret>`. Keys are stored hashed (with a peppered SHA) in a gateway-owned SQLite DB (default `C:\ProgramData\MxGateway\gateway-auth.db`). Scopes (`session`, `invoke`, `event`, `metadata`, `admin`) gate specific RPCs; missing → `Unauthenticated`, insufficient → `PermissionDenied`. The `apikey` subcommand on the server exe manages keys; see `src/ZB.MOM.WW.MxGateway.Server/Security/Authentication/`.
Dashboard auth is LDAP-backed (separate from the gRPC API-key model). `/login` binds against `MxGateway:Ldap` and maps the user's LDAP groups to `Admin` or `Viewer` via `MxGateway:Dashboard:GroupToRole`, then issues an HTTP-only secure `__Host-MxGatewayDashboard` cookie. SignalR hubs at `/hubs/{snapshot,alarms,events}` accept either the cookie or a 30-minute bearer minted at `/hubs/token`. `Dashboard:AllowAnonymousLocalhost` bypasses auth on loopback when enabled.
Dashboard auth is LDAP-backed (separate from the gRPC API-key model). `/login` binds against `MxGateway:Ldap` and maps the user's LDAP groups to `Admin` or `Viewer` via `MxGateway:Dashboard:GroupToRole`, then issues an HTTP-only secure `__Host-MxGatewayDashboard` cookie. SignalR hubs at `/hubs/{snapshot,alarms,events}` accept either the cookie or a 30-minute bearer minted at `/hubs/token`. `Dashboard:AllowAnonymousLocalhost` bypasses auth on loopback when enabled. `Dashboard:DisableLogin` (default `false`) auto-authenticates every dashboard request — including remote browsers — as `Dashboard:AutoLoginUser` (default `multi-role`) with both Admin and Viewer roles; dev/test only, never enable in production.
## Process / Platform Notes
@@ -6,6 +6,9 @@ dependencies {
implementation project(':zb-mom-ww-mxgateway-client')
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
implementation "info.picocli:picocli:${picocliVersion}"
testImplementation "io.grpc:grpc-inprocess:${grpcVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
}
application {
@@ -131,6 +131,11 @@ public final class MxGatewayCli implements Callable<Integer> {
}
static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
return commandLine(clientFactory, new GrpcGalaxyClientFactory());
}
static CommandLine commandLine(
MxGatewayCliClientFactory clientFactory, GalaxyClientFactory galaxyClientFactory) {
CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory));
commandLine.addSubcommand("version", new VersionCommand());
commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory));
@@ -152,11 +157,11 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory));
commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
commandLine.addSubcommand("galaxy-test-connection", new GalaxyTestConnectionCommand());
commandLine.addSubcommand("galaxy-last-deploy", new GalaxyDeployTimeCommand());
commandLine.addSubcommand("galaxy-discover", new GalaxyDiscoverCommand());
commandLine.addSubcommand("galaxy-browse", new GalaxyBrowseCommand());
commandLine.addSubcommand("galaxy-watch", new GalaxyWatchCommand());
commandLine.addSubcommand("galaxy-test-connection", new GalaxyTestConnectionCommand(galaxyClientFactory));
commandLine.addSubcommand("galaxy-last-deploy", new GalaxyDeployTimeCommand(galaxyClientFactory));
commandLine.addSubcommand("galaxy-discover", new GalaxyDiscoverCommand(galaxyClientFactory));
commandLine.addSubcommand("galaxy-browse", new GalaxyBrowseCommand(galaxyClientFactory));
commandLine.addSubcommand("galaxy-watch", new GalaxyWatchCommand(galaxyClientFactory));
commandLine.addSubcommand("batch", new BatchCommand(clientFactory));
return commandLine;
}
@@ -359,14 +364,20 @@ public final class MxGatewayCli implements Callable<Integer> {
}
abstract static class GalaxyCommand implements Callable<Integer> {
final GalaxyClientFactory galaxyClientFactory;
@Mixin
CommonOptions common = new CommonOptions();
@Option(names = "--json", description = "Write JSON output.")
boolean json;
GalaxyCommand(GalaxyClientFactory galaxyClientFactory) {
this.galaxyClientFactory = galaxyClientFactory;
}
GalaxyRepositoryClient connect() {
return GalaxyRepositoryClient.connect(common.resolved().toClientOptions());
return galaxyClientFactory.connect(common.resolved().toClientOptions());
}
}
@@ -375,6 +386,10 @@ public final class MxGatewayCli implements Callable<Integer> {
aliases = {"galaxy-test"},
description = "Calls GalaxyRepository.TestConnection.")
static final class GalaxyTestConnectionCommand extends GalaxyCommand {
GalaxyTestConnectionCommand(GalaxyClientFactory galaxyClientFactory) {
super(galaxyClientFactory);
}
@Override
public Integer call() {
try (GalaxyRepositoryClient client = connect()) {
@@ -399,6 +414,10 @@ public final class MxGatewayCli implements Callable<Integer> {
aliases = {"galaxy-deploy-time"},
description = "Calls GalaxyRepository.GetLastDeployTime.")
static final class GalaxyDeployTimeCommand extends GalaxyCommand {
GalaxyDeployTimeCommand(GalaxyClientFactory galaxyClientFactory) {
super(galaxyClientFactory);
}
@Override
public Integer call() {
try (GalaxyRepositoryClient client = connect()) {
@@ -423,6 +442,10 @@ public final class MxGatewayCli implements Callable<Integer> {
@Command(name = "galaxy-discover", description = "Calls GalaxyRepository.DiscoverHierarchy.")
static final class GalaxyDiscoverCommand extends GalaxyCommand {
GalaxyDiscoverCommand(GalaxyClientFactory galaxyClientFactory) {
super(galaxyClientFactory);
}
@Override
public Integer call() {
try (GalaxyRepositoryClient client = connect()) {
@@ -458,6 +481,10 @@ public final class MxGatewayCli implements Callable<Integer> {
name = "galaxy-browse",
description = "Browses the Galaxy hierarchy via GalaxyRepository.BrowseChildren.")
static final class GalaxyBrowseCommand extends GalaxyCommand {
GalaxyBrowseCommand(GalaxyClientFactory galaxyClientFactory) {
super(galaxyClientFactory);
}
@Spec
private CommandSpec spec;
@@ -718,6 +745,10 @@ public final class MxGatewayCli implements Callable<Integer> {
name = "galaxy-watch",
description = "Streams GalaxyRepository.WatchDeployEvents until cancelled.")
static final class GalaxyWatchCommand extends GalaxyCommand {
GalaxyWatchCommand(GalaxyClientFactory galaxyClientFactory) {
super(galaxyClientFactory);
}
@Option(
names = "--last-seen-deploy-time",
description =
@@ -1725,6 +1756,10 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
interface GalaxyClientFactory {
GalaxyRepositoryClient connect(MxGatewayClientOptions options);
}
interface MxGatewayCliClientFactory {
MxGatewayCliClient connect(CommonOptions options);
}
@@ -1781,6 +1816,13 @@ public final class MxGatewayCli implements Callable<Integer> {
MxEventStream streamEventsAfter(long afterWorkerSequence);
}
static final class GrpcGalaxyClientFactory implements GalaxyClientFactory {
@Override
public GalaxyRepositoryClient connect(MxGatewayClientOptions options) {
return GalaxyRepositoryClient.connect(options);
}
}
static final class GrpcMxGatewayCliClientFactory implements MxGatewayCliClientFactory {
@Override
public MxGatewayCliClient connect(CommonOptions options) {
@@ -0,0 +1,216 @@
package com.zb.mom.ww.mxgateway.cli;
import com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient;
import com.zb.mom.ww.mxgateway.client.MxGatewayClient;
import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions;
import galaxy_repository.v1.GalaxyRepositoryGrpc;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyReply;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyRequest;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
/**
* Test fixture that stands up an in-process gRPC server hosting scripted fake
* {@code MxAccessGateway} and {@code GalaxyRepository} service implementations,
* so the real Java client types ({@link MxGatewayClient} /
* {@link GalaxyRepositoryClient}) can be driven over a real channel.
*
* <p>The real streaming wrappers ({@code MxEventStream} /
* {@code DeployEventStream}) have package-private constructors and
* {@link GalaxyRepositoryClient} is {@code final}, so the streaming and galaxy
* CLI commands cannot be exercised through the lightweight {@code FakeSession}
* seam. Driving the real client over an in-process channel against scripted
* services is the clean alternative; Tasks 5 and 6 add the CLI assertions on
* top of this fixture.
*
* <p>Scripted payloads are settable via constructor args or setters. Each
* instance uses a unique server name so harnesses do not collide. The
* {@code directExecutor()} wiring keeps all dispatch on the calling thread, so
* no background threads are leaked.
*/
final class InProcessGatewayHarness implements AutoCloseable {
private final String serverName;
private final Server server;
private final ManagedChannel channel;
private final FakeGatewayService fakeGateway;
private final FakeGalaxyService fakeGalaxy;
/** Starts a harness with empty scripted payloads; populate via setters. */
InProcessGatewayHarness() {
this(List.of(), List.of(), List.of());
}
/**
* Starts a harness with the supplied scripted payloads.
*
* @param scriptedEvents events {@code streamEvents} pushes before completing
* @param scriptedObjects objects {@code discoverHierarchy} returns (single page)
* @param scriptedDeployEvents events {@code watchDeployEvents} streams before completing
*/
InProcessGatewayHarness(
List<MxEvent> scriptedEvents,
List<GalaxyObject> scriptedObjects,
List<DeployEvent> scriptedDeployEvents) {
this.serverName = "mxgw-cli-harness-" + UUID.randomUUID();
this.fakeGateway = new FakeGatewayService(scriptedEvents);
this.fakeGalaxy = new FakeGalaxyService(scriptedObjects, scriptedDeployEvents);
try {
this.server = InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(fakeGateway)
.addService(fakeGalaxy)
.build()
.start();
} catch (IOException error) {
throw new IllegalStateException("failed to start in-process gateway harness", error);
}
this.channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
}
/** Replaces the scripted {@code streamEvents} payload. */
void setScriptedEvents(List<MxEvent> events) {
fakeGateway.scriptedEvents.clear();
fakeGateway.scriptedEvents.addAll(events);
}
/** Replaces the scripted {@code discoverHierarchy} payload. */
void setScriptedObjects(List<GalaxyObject> objects) {
fakeGalaxy.scriptedObjects.clear();
fakeGalaxy.scriptedObjects.addAll(objects);
}
/** Replaces the scripted {@code watchDeployEvents} payload. */
void setScriptedDeployEvents(List<DeployEvent> deployEvents) {
fakeGalaxy.scriptedDeployEvents.clear();
fakeGalaxy.scriptedDeployEvents.addAll(deployEvents);
}
/**
* Returns the in-process channel into the scripted services.
*
* @return the managed channel; lifecycle owned by the harness
*/
ManagedChannel channel() {
return channel;
}
/**
* Builds a real {@link MxGatewayClient} over the in-process channel.
*
* @return a client borrowing the harness channel
*/
MxGatewayClient gatewayClient() {
return new MxGatewayClient(channel, testOptions());
}
/**
* Builds a real {@link GalaxyRepositoryClient} over the in-process channel.
*
* @return a client borrowing the harness channel
*/
GalaxyRepositoryClient galaxyClient() {
return new GalaxyRepositoryClient(channel, testOptions());
}
private static MxGatewayClientOptions testOptions() {
return MxGatewayClientOptions.builder()
.endpoint("in-process")
.apiKey("mxgw_test_secret")
.plaintext(true)
.callTimeout(Duration.ofSeconds(5))
.build();
}
@Override
public void close() {
channel.shutdownNow();
server.shutdownNow();
}
private static ProtocolStatus ok() {
return ProtocolStatus.newBuilder()
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
.build();
}
/** Scripted fake of the {@code MxAccessGateway} service. */
private static final class FakeGatewayService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase {
private final List<MxEvent> scriptedEvents = new CopyOnWriteArrayList<>();
FakeGatewayService(List<MxEvent> scriptedEvents) {
this.scriptedEvents.addAll(scriptedEvents);
}
@Override
public void streamEvents(
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
StreamObserver<MxEvent> responseObserver) {
for (MxEvent event : scriptedEvents) {
responseObserver.onNext(event);
}
responseObserver.onCompleted();
}
@Override
public void closeSession(
CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
responseObserver.onNext(CloseSessionReply.newBuilder()
.setSessionId(request.getSessionId())
.setFinalState(SessionState.SESSION_STATE_CLOSED)
.setProtocolStatus(ok())
.build());
responseObserver.onCompleted();
}
}
/** Scripted fake of the {@code GalaxyRepository} service. */
private static final class FakeGalaxyService extends GalaxyRepositoryGrpc.GalaxyRepositoryImplBase {
private final List<GalaxyObject> scriptedObjects = new CopyOnWriteArrayList<>();
private final List<DeployEvent> scriptedDeployEvents = new CopyOnWriteArrayList<>();
FakeGalaxyService(List<GalaxyObject> scriptedObjects, List<DeployEvent> scriptedDeployEvents) {
this.scriptedObjects.addAll(scriptedObjects);
this.scriptedDeployEvents.addAll(scriptedDeployEvents);
}
@Override
public void discoverHierarchy(
DiscoverHierarchyRequest request, StreamObserver<DiscoverHierarchyReply> responseObserver) {
List<GalaxyObject> snapshot = new ArrayList<>(scriptedObjects);
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.setTotalObjectCount(snapshot.size())
.addAllObjects(snapshot)
.setNextPageToken("")
.build());
responseObserver.onCompleted();
}
@Override
public void watchDeployEvents(
WatchDeployEventsRequest request, StreamObserver<DeployEvent> responseObserver) {
for (DeployEvent event : scriptedDeployEvents) {
responseObserver.onNext(event);
}
responseObserver.onCompleted();
}
}
}
@@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.zb.mom.ww.mxgateway.client.MxGatewayAlarmFeedSubscription;
import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayInputStream;
@@ -31,6 +32,7 @@ import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxEventFamily;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
@@ -305,6 +307,183 @@ final class MxGatewayCliTests {
assertTrue(run.output().contains("\"tagAddress\":\"TestMachine_002.TestChangingInt\""));
}
// ---- read-bulk / write-bulk / write2-bulk subcommands ----
@Test
void readBulkCommandForwardsTimeoutAndPrintsResults() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"read-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--items", "TestMachine_001.TestInt,TestMachine_002.TestInt",
"--timeout-ms", "750",
"--json");
assertEquals(0, run.exitCode());
assertEquals(750, factory.client.session.lastReadBulkTimeoutMs);
String out = run.output();
assertTrue(out.contains("\"command\":\"read-bulk\""), out);
assertTrue(out.contains("\"tagAddress\":\"TestMachine_001.TestInt\""), out);
assertTrue(out.contains("\"tagAddress\":\"TestMachine_002.TestInt\""), out);
assertTrue(out.contains("\"itemHandle\""), out);
assertTrue(out.contains("\"wasCached\""), out);
assertTrue(out.contains("\"quality\""), out);
}
@Test
void writeBulkCommandParsesTypedValuesAndPrintsResults() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"write-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--item-handles", "100,101",
"--type", "int32",
"--values", "111,222",
"--user-id", "5",
"--json");
assertEquals(0, run.exitCode());
List<WriteBulkEntry> entries = factory.client.session.lastWriteBulkEntries;
assertEquals(2, entries.size());
assertEquals(111, entries.get(0).getValue().getInt32Value());
assertEquals(222, entries.get(1).getValue().getInt32Value());
assertEquals(5, entries.get(0).getUserId());
assertEquals(5, entries.get(1).getUserId());
String out = run.output();
assertTrue(out.contains("\"command\":\"write-bulk\""), out);
assertTrue(out.contains("\"itemHandle\":100"), out);
assertTrue(out.contains("\"wasSuccessful\":true"), out);
}
@Test
void write2BulkCommandForwardsTimestampAndPrintsResults() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"write2-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--item-handles", "100",
"--type", "string",
"--values", "hello",
"--timestamp", "2026-05-20T00:00:00Z",
"--json");
assertEquals(0, run.exitCode());
List<Write2BulkEntry> entries = factory.client.session.lastWrite2BulkEntries;
assertEquals(1, entries.size());
assertTrue(entries.get(0).hasTimestampValue(), "expected timestampValue to be set");
String out = run.output();
assertTrue(out.contains("\"command\":\"write2-bulk\""), out);
assertTrue(out.contains("\"itemHandle\":100"), out);
assertTrue(out.contains("\"wasSuccessful\":true"), out);
}
@Test
void writeSecuredBulkCommandForwardsUserIdsAndPrintsResults() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"write-secured-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--item-handles", "100,101",
"--type", "int32",
"--values", "10,20",
"--current-user-id", "7",
"--verifier-user-id", "8",
"--json");
assertEquals(0, run.exitCode());
List<WriteSecuredBulkEntry> entries = factory.client.session.lastWriteSecuredBulkEntries;
assertEquals(2, entries.size());
assertEquals(7, entries.get(0).getCurrentUserId());
assertEquals(8, entries.get(0).getVerifierUserId());
assertEquals(7, entries.get(1).getCurrentUserId());
assertEquals(8, entries.get(1).getVerifierUserId());
assertEquals(10, entries.get(0).getValue().getInt32Value());
assertEquals(20, entries.get(1).getValue().getInt32Value());
String out = run.output();
assertTrue(out.contains("\"command\":\"write-secured-bulk\""), out);
assertTrue(out.contains("\"itemHandle\":100"), out);
assertTrue(out.contains("\"wasSuccessful\":true"), out);
}
@Test
void writeSecured2BulkCommandForwardsTimestampAndUserIdsAndPrintsResults() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"write-secured2-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--item-handles", "100",
"--type", "string",
"--values", "hello",
"--timestamp", "2026-05-20T00:00:00Z",
"--current-user-id", "7",
"--verifier-user-id", "8",
"--json");
assertEquals(0, run.exitCode());
List<WriteSecured2BulkEntry> entries = factory.client.session.lastWriteSecured2BulkEntries;
assertEquals(1, entries.size());
assertEquals(7, entries.get(0).getCurrentUserId());
assertEquals(8, entries.get(0).getVerifierUserId());
assertTrue(entries.get(0).hasTimestampValue(), "expected timestampValue to be set");
String out = run.output();
assertTrue(out.contains("\"command\":\"write-secured2-bulk\""), out);
assertTrue(out.contains("\"itemHandle\":100"), out);
assertTrue(out.contains("\"wasSuccessful\":true"), out);
}
@Test
void benchReadBulkCommandEmitsJsonSchemaKeys() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"bench-read-bulk",
"--session-id", "session-cli",
"--server-handle", "42",
"--items", "TestMachine_001.TestInt",
"--iterations", "3",
"--warmup", "0",
"--json");
assertEquals(0, run.exitCode());
String out = run.output();
assertTrue(out.contains("\"command\":\"bench-read-bulk\""), out);
assertTrue(out.contains("\"iterations\""), out);
assertTrue(out.contains("\"warmup\""), out);
assertTrue(out.contains("\"tagCount\""), out);
assertTrue(out.contains("\"resultCount\""), out);
assertTrue(out.contains("\"successCount\""), out);
assertTrue(out.contains("\"cachedCount\""), out);
assertTrue(out.contains("\"avgMs\""), out);
assertTrue(out.contains("\"minMs\""), out);
assertTrue(out.contains("\"maxMs\""), out);
}
@Test
void closeSessionCommandPrintsReply() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"close-session",
"--session-id", "session-cli",
"--json");
assertEquals(0, run.exitCode());
assertTrue(factory.client.closeCalled);
String out = run.output();
assertTrue(out.contains("\"command\":\"close-session\""), out);
assertTrue(out.contains("SESSION_STATE_CLOSED"), out);
}
@Test
void unsubscribeBulkCommandPrintsResults() {
CliRun run = execute(
@@ -516,6 +695,143 @@ final class MxGatewayCliTests {
+ " out=\n" + run.output() + "\nerr=\n" + run.errors());
}
@Test
void streamEventsRendersScriptedEventsIncludingHighUint64Sequence() {
// Drive the REAL MxGatewayClient / MxGatewaySession / MxEventStream
// path over the in-process harness (Task 4), so the production
// stream-events command exercises the real streaming wrapper instead
// of a hand-written FakeSession seam.
//
// The high worker-sequence (-1L == 18446744073709551615 unsigned)
// covers the unsigned-rendering regression: worker_sequence is a
// proto uint64 carried as a Java long with the top bit set. The CLI's
// --json path renders it through protobuf's JsonFormat, which prints
// uint64 as an unsigned decimal STRING; a naive %d render would print
// a negative number instead.
MxEvent dataChange = MxEvent.newBuilder()
.setFamily(MxEventFamily.MX_EVENT_FAMILY_ON_DATA_CHANGE)
.setSessionId("session-cli")
.setServerHandle(7)
.setItemHandle(42)
.setWorkerSequence(5L)
.build();
MxEvent highSequence = MxEvent.newBuilder()
.setFamily(MxEventFamily.MX_EVENT_FAMILY_OPERATION_COMPLETE)
.setSessionId("session-cli")
.setServerHandle(9)
.setItemHandle(99)
// -1L unsigned == 18446744073709551615 (top bit set).
.setWorkerSequence(-1L)
.build();
try (InProcessGatewayHarness harness = new InProcessGatewayHarness()) {
harness.setScriptedEvents(List.of(dataChange, highSequence));
CliRun run = execute(
new HarnessClientFactory(harness),
"stream-events",
"--session-id",
"session-cli",
"--json");
assertEquals(0, run.exitCode(), "errors:\n" + run.errors());
String out = run.output();
// Scripted event fields surface in the JSON render.
assertTrue(out.contains("\"family\":\"MX_EVENT_FAMILY_ON_DATA_CHANGE\""), out);
assertTrue(out.contains("\"family\":\"MX_EVENT_FAMILY_OPERATION_COMPLETE\""), out);
assertTrue(out.contains("\"serverHandle\":7"), out);
assertTrue(out.contains("\"itemHandle\":42"), out);
// The low sequence renders as the unsigned decimal string "5".
assertTrue(out.contains("\"workerSequence\":\"5\""), out);
// The high sequence renders as the FULL unsigned decimal, not -1.
assertTrue(out.contains("\"workerSequence\":\"18446744073709551615\""), out);
assertFalse(out.contains("\"workerSequence\":\"-1\""), out);
assertFalse(out.contains("\"workerSequence\":-1"), out);
}
}
// ---- galaxy-discover / galaxy-watch over the in-process harness (Task 6) ----
@Test
void galaxyDiscoverPrintsPagedHierarchyJson() {
// Drive the REAL GalaxyRepositoryClient.discoverHierarchy path over the
// in-process harness (Task 6), so the production galaxy-discover command
// exercises the real client paging loop against scripted objects instead
// of a hand-written seam. The harness's fake discoverHierarchy returns a
// single page carrying the scripted objects.
GalaxyObject area = GalaxyObject.newBuilder()
.setGobjectId(101)
.setTagName("Area001")
.setContainedName("Area001")
.setBrowseName("Area001")
.setIsArea(true)
.build();
GalaxyObject pump = GalaxyObject.newBuilder()
.setGobjectId(202)
.setTagName("Pump001")
.setContainedName("Pump001")
.setBrowseName("Pump001")
.setParentGobjectId(101)
.build();
try (InProcessGatewayHarness harness = new InProcessGatewayHarness()) {
harness.setScriptedObjects(List.of(area, pump));
CliRun run = executeGalaxy(new HarnessGalaxyClientFactory(harness), "galaxy-discover", "--json");
assertEquals(0, run.exitCode(), "errors:\n" + run.errors());
String out = run.output();
// galaxy-discover --json renders {"command":..,"options":..,"objects":[{galaxyObjectMap}]}.
assertTrue(out.contains("\"command\":\"galaxy-discover\""), out);
// Both scripted objects render with the flattened object fields.
assertTrue(out.contains("\"tagName\":\"Area001\""), out);
assertTrue(out.contains("\"tagName\":\"Pump001\""), out);
assertTrue(out.contains("\"gobjectId\":101"), out);
assertTrue(out.contains("\"gobjectId\":202"), out);
assertTrue(out.contains("\"parentGobjectId\":101"), out);
assertTrue(out.contains("\"isArea\":true"), out);
}
}
@Test
void galaxyWatchRendersScriptedDeployEvents() {
// Drive the REAL GalaxyRepositoryClient.watchDeployEvents / DeployEventStream
// path over the in-process harness. The harness's fake watchDeployEvents
// streams the scripted deploy events then completes; the CLI's --limit
// option caps how many it prints before closing the stream.
DeployEvent first = DeployEvent.newBuilder()
.setSequence(7L)
.setObjectCount(12)
.setAttributeCount(34)
.build();
DeployEvent second = DeployEvent.newBuilder()
.setSequence(8L)
.setObjectCount(13)
.setAttributeCount(35)
.build();
DeployEvent third = DeployEvent.newBuilder()
.setSequence(9L)
.setObjectCount(14)
.setAttributeCount(36)
.build();
try (InProcessGatewayHarness harness = new InProcessGatewayHarness()) {
harness.setScriptedDeployEvents(List.of(first, second, third));
// --limit 2 caps the feed at the first two scripted events.
CliRun run = executeGalaxy(
new HarnessGalaxyClientFactory(harness), "galaxy-watch", "--limit", "2", "--json");
assertEquals(0, run.exitCode(), "errors:\n" + run.errors());
String out = run.output();
// galaxy-watch --json prints one proto-JSON object per event; proto3
// JSON renders uint64 sequence as a decimal string.
assertTrue(out.contains("\"sequence\":\"7\""), out);
assertTrue(out.contains("\"objectCount\":12"), out);
assertTrue(out.contains("\"attributeCount\":34"), out);
assertTrue(out.contains("\"sequence\":\"8\""), out);
// --limit 2 must stop before printing the third scripted event.
assertFalse(out.contains("\"sequence\":\"9\""), out);
}
}
@Test
void batchCommandExecutesVersionAndEmitsEorMarker() {
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
@@ -630,6 +946,24 @@ final class MxGatewayCliTests {
return new CliRun(exitCode, output.toString(), errors.toString());
}
/**
* Runs a galaxy subcommand against the supplied {@link
* MxGatewayCli.GalaxyClientFactory}, wiring it through the production
* {@code commandLine(gatewayFactory, galaxyFactory)} two-arg overload (Task 3
* seam). The gateway factory slot is unused by galaxy commands, so a plain
* {@link FakeClientFactory} fills it. Mirrors {@link #execute} for the
* gateway commands.
*/
private static CliRun executeGalaxy(MxGatewayCli.GalaxyClientFactory galaxyFactory, String... args) {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
picocli.CommandLine commandLine = MxGatewayCli.commandLine(new FakeClientFactory(), galaxyFactory);
commandLine.setOut(new PrintWriter(output, true));
commandLine.setErr(new PrintWriter(errors, true));
int exitCode = commandLine.execute(args);
return new CliRun(exitCode, output.toString(), errors.toString());
}
private record CliRun(int exitCode, String output, String errors) {
}
@@ -672,6 +1006,50 @@ final class MxGatewayCliTests {
}
}
/**
* Factory that wires the production {@link MxGatewayCli.GrpcMxGatewayCliClient}
* adapter around the harness's REAL
* {@link com.zb.mom.ww.mxgateway.client.MxGatewayClient}, so the
* stream-events command runs against the in-process scripted gateway over
* a real channel (exercising the real {@code MxEventStream}). Mirrors the
* production {@code GrpcMxGatewayCliClientFactory}, swapping only the
* client construction for the harness-backed client.
*/
private static final class HarnessClientFactory implements MxGatewayCli.MxGatewayCliClientFactory {
private final InProcessGatewayHarness harness;
private HarnessClientFactory(InProcessGatewayHarness harness) {
this.harness = harness;
}
@Override
public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) {
return new MxGatewayCli.GrpcMxGatewayCliClient(
harness.gatewayClient(), options.spec.commandLine().getOut());
}
}
/**
* Galaxy factory that returns the harness's REAL {@link
* com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient} over the in-process
* scripted {@code GalaxyRepository} service, so galaxy-discover / galaxy-watch
* exercise the real client (paging loop, deploy-event stream wrapper) against
* scripted payloads. Mirrors the production {@code GrpcGalaxyClientFactory},
* swapping only client construction for the harness-backed client.
*/
private static final class HarnessGalaxyClientFactory implements MxGatewayCli.GalaxyClientFactory {
private final InProcessGatewayHarness harness;
private HarnessGalaxyClientFactory(InProcessGatewayHarness harness) {
this.harness = harness;
}
@Override
public com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient connect(MxGatewayClientOptions options) {
return harness.galaxyClient();
}
}
private static final class OverflowingFakeClient implements MxGatewayCli.MxGatewayCliClient {
private final PrintWriter out;
@@ -815,6 +1193,12 @@ final class MxGatewayCliTests {
private boolean adviseCalled;
private MxValue lastWriteValue;
private String lastPingMessage;
private long lastReadBulkTimeoutMs;
private List<String> lastReadBulkItems;
private List<WriteBulkEntry> lastWriteBulkEntries;
private List<Write2BulkEntry> lastWrite2BulkEntries;
private List<WriteSecuredBulkEntry> lastWriteSecuredBulkEntries;
private List<WriteSecured2BulkEntry> lastWriteSecured2BulkEntries;
@Override
public MxCommandReply pingRaw(String message) {
@@ -912,6 +1296,8 @@ final class MxGatewayCliTests {
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout) {
lastReadBulkTimeoutMs = timeout.toMillis();
lastReadBulkItems = new ArrayList<>(items);
List<BulkReadResult> results = new ArrayList<>();
for (int index = 0; index < items.size(); index++) {
results.add(BulkReadResult.newBuilder()
@@ -927,6 +1313,7 @@ final class MxGatewayCliTests {
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
lastWriteBulkEntries = new ArrayList<>(entries);
List<BulkWriteResult> results = new ArrayList<>();
for (WriteBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
@@ -940,6 +1327,7 @@ final class MxGatewayCliTests {
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
lastWrite2BulkEntries = new ArrayList<>(entries);
List<BulkWriteResult> results = new ArrayList<>();
for (Write2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
@@ -953,6 +1341,7 @@ final class MxGatewayCliTests {
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
lastWriteSecuredBulkEntries = new ArrayList<>(entries);
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecuredBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
@@ -966,6 +1355,7 @@ final class MxGatewayCliTests {
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
lastWriteSecured2BulkEntries = new ArrayList<>(entries);
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecured2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
+7 -1
View File
@@ -37,8 +37,10 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"DetachGraceSeconds": 30,
"AllowMultipleEventSubscribers": false,
"MaxEventSubscribersPerSession": 8
"MaxEventSubscribersPerSession": 8,
"WorkerReadyWaitTimeoutMs": 0
},
"Events": {
"QueueCapacity": 10000,
@@ -126,8 +128,10 @@ to avoid accidental large allocations from malformed or oversized frames.
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
| `MxGateway:Sessions:DetachGraceSeconds` | `30` | Detach-grace retention window. When positive, a session whose last external (gRPC) event-stream subscriber drops is retained in `Ready` for this many seconds so a client can reconnect; if no external subscriber re-attaches within the window, the lease monitor closes it with `detach-grace-expired`. The internal dashboard mirror does not count as an external subscriber, so a dashboard-only session still enters detach-grace. `0` disables retention and reverts to closing only on normal lease expiry. Must be zero or greater. Reconnect/replay itself is implemented separately (Task 12); this option controls retention and expiry only. The effective close happens within the next sweep cycle after the window elapses — up to `LeaseSweepIntervalSeconds` after expiry. Operators wanting a firm minimum retention bound should set `DetachGraceSeconds` greater than `LeaseSweepIntervalSeconds`. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
| `MxGateway:Sessions:WorkerReadyWaitTimeoutMs` | `0` | Bounded time, in milliseconds, the gateway will wait for a worker to reach `Ready` when the session is already `Ready` but the worker state has transiently diverged (e.g. `Handshaking` after a heartbeat blip). Applies only to transient worker states; terminal states (`Faulted`/`Closing`/`Closed`/no worker) fail fast immediately regardless of this setting. `0` (the default) disables the wait and preserves the original fail-fast behavior. Must be greater than or equal to zero. |
All numeric session options must be greater than zero.
@@ -169,6 +173,8 @@ events (a "gap") and must re-snapshot; whatever is still retained is replayed.
| `MxGateway:Dashboard:RecentSessionLimit` | `200` | Maximum number of session summaries projected into each dashboard snapshot. |
| `MxGateway:Dashboard:ShowTagValues` | `false` | Reserved display control for tag values. The dashboard does not show full tag values by default. |
| `MxGateway:Dashboard:GroupToRole` | _(empty)_ | LDAP group → dashboard role mapping. Keys are LDAP group names (short CN or full DN — leading-RDN match). Values must be `Admin` (read/write, API-key CRUD) or `Viewer` (read-only). A user whose LDAP groups don't intersect this map cannot sign in; with no mapping at all, only the loopback bypass admits anyone. |
| `MxGateway:Dashboard:DisableLogin` | `false` | Dev/test only. When `true`, replaces the cookie authentication handler with `DashboardAutoLoginAuthenticationHandler`, which auto-authenticates every dashboard request — including requests from remote browsers, not just loopback — as `AutoLoginUser` holding both `Administrator` and `Viewer` roles. No login form, LDAP bind, or cookie is involved. A loud one-time startup warning is logged. Differs from `AllowAnonymousLocalhost`: `DisableLogin` mints a real authenticated principal (so role-gated write affordances appear), whereas `AllowAnonymousLocalhost` satisfies the authorization requirement on loopback only without minting a principal (write affordances stay hidden). Never enable in production. |
| `MxGateway:Dashboard:AutoLoginUser` | `(null)` | Username stamped on the synthetic principal when `DisableLogin` is `true`. Default `(null)` — a null or blank value falls back to `multi-role`. Has no effect when `DisableLogin` is `false`. |
`SnapshotIntervalMilliseconds` must be greater than zero. `RecentFaultLimit`
and `RecentSessionLimit` must be greater than or equal to zero.
+31
View File
@@ -442,6 +442,37 @@ authorizes every request, and `MxGateway:Dashboard:AllowAnonymousLocalhost`
requests always require an authenticated principal carrying at least the
Viewer role.
### DisableLogin dev bypass
`MxGateway:Dashboard:DisableLogin` (default `false`) is a third bypass for
dev and test environments where LDAP is unavailable or irrelevant.
When the flag is `true`, the `DashboardAuthenticator`-backed cookie handler is
replaced by `DashboardAutoLoginAuthenticationHandler`, registered under the
same scheme name (`MxGateway.Dashboard`). The handler auto-authenticates every
incoming request — including requests from remote browsers, not just loopback —
as a principal for `MxGateway:Dashboard:AutoLoginUser` (default `multi-role`)
holding both the `Administrator` and `Viewer` role claims.
The same-scheme-name swap is intentional: every authorization policy
(`MxGateway.Dashboard.Viewer`, `MxGateway.Dashboard.Admin`,
`MxGateway.Dashboard.HubClients`) resolves the `MxGateway.Dashboard` scheme,
so the handler replacement requires zero changes to policies, Razor page
attributes, or hub authorization attributes. `UseAuthentication()` stamps the
principal on `HttpContext.User` for the full HTTP pipeline, the Blazor circuit,
and the SignalR hubs uniformly — there is no separate path for each surface.
This differs from `AllowAnonymousLocalhost`: that flag satisfies the Viewer
authorization requirement on loopback without minting an authenticated
principal, so role-gated write affordances (Admin-only API-key CRUD, Close/Kill
controls) stay hidden. `DisableLogin` mints a real multi-role principal, so
those affordances appear — which is the point for dev scenarios where a
developer needs the full Admin surface without standing up LDAP.
A loud one-time startup warning is logged when `DisableLogin` is `true`. The
gRPC API-key authentication path is untouched; only the dashboard cookie
surface is affected. Never enable in production.
### Hub bearer flow
SignalR connections cannot reuse the `__Host-` cookie when the JS client
+32 -3
View File
@@ -72,7 +72,7 @@ private void EnsureSessionCapacity()
}
```
`SessionManager` also defines three close-reason constants — `DefaultCloseReason` (`"client-close"`), `GatewayShutdownReason` (`"gateway-shutdown"`), and `LeaseExpiredReason` (`"lease-expired"`) — so that the metrics and worker shutdown paths agree on a fixed vocabulary.
`SessionManager` also defines four close-reason constants — `DefaultCloseReason` (`"client-close"`), `GatewayShutdownReason` (`"gateway-shutdown"`), `LeaseExpiredReason` (`"lease-expired"`), and `DetachGraceExpiredReason` (`"detach-grace-expired"`) — so that the metrics and worker shutdown paths agree on a fixed vocabulary.
### SessionRegistry (ISessionRegistry)
@@ -150,7 +150,8 @@ public sealed record SessionCloseResult(
|------|---------|
| `SessionNotFound` | The session id is not in the registry. |
| `SessionNotReady` | The session or its `IWorkerClient` is not in `Ready` state. |
| `EventSubscriberAlreadyActive` | A second event subscriber attached when only one is allowed. |
| `EventSubscriberAlreadyActive` | A second event subscriber attached in single-subscriber mode (`AllowMultipleEventSubscribers` is `false`). |
| `EventSubscriberLimitReached` | In multi-subscriber mode, an attach exceeded `MaxEventSubscribersPerSession` concurrent external subscribers. |
| `EventQueueOverflow` | Reserved for the worker event channel overflow path. |
| `SessionLimitExceeded` | `MaxSessions` is in use. |
| `OpenFailed` | `OpenSessionAsync` failed; the inner exception carries the cause. |
@@ -192,10 +193,38 @@ The order — fault, deregister, dispose, release slot, record metric, log, reth
While `Ready`, callers reach the worker through `SessionManager.InvokeAsync` or `ReadEventsAsync`. Both delegate to `GatewaySession`, which checks the state under lock and updates `LastClientActivityAt` on every invocation. `GatewaySession` also exposes typed bulk helpers (`AddItemBulkAsync`, `SubscribeBulkAsync`, etc.) that wrap `WorkerCommand` round-trips and translate non-`Ok` `ProtocolStatus` replies into `SessionManagerException` with `SessionNotReady`.
Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false the second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. Active event subscribers keep the session lease from expiring until the stream is disposed.
Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false (single-subscriber mode) a second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. When it is true, up to `MaxEventSubscribersPerSession` concurrent external subscribers are allowed and the next attach throws `EventSubscriberLimitReached`. The count-check-and-increment is atomic under the session lock, so concurrent attaches can never exceed the cap. The gateway-owned internal dashboard mirror subscriber is registered directly on the distributor and does not count toward the cap. Active event subscribers keep the session lease from expiring until the stream is disposed.
`FailFast` event backpressure faults the whole session only in single-subscriber mode; in multi-subscriber mode it degrades to a per-subscriber disconnect so one slow consumer never faults a session shared by others. The session passes its mode to the `SessionEventDistributor` at construction, so this decision is made on the fixed mode rather than a live subscriber-count snapshot.
Sessions open with `MxGateway:Sessions:DefaultLeaseSeconds` (default 1800) added to the open timestamp. Unary client activity refreshes the lease by the same duration. `ExtendLease` and `IsLeaseExpired` cooperate with `SessionManager.CloseExpiredLeasesAsync`, which iterates a registry snapshot and closes any session whose lease has expired with `LeaseExpiredReason`. `SessionLeaseMonitorHostedService` runs that sweep every `MxGateway:Sessions:LeaseSweepIntervalSeconds` seconds (default 30).
#### Detach-grace retention
`MxGateway:Sessions:DetachGraceSeconds` (default 30) is a bounded retention window kept after a session's *last external (gRPC) event-stream subscriber* drops, so a client can reconnect to the same session instead of having it torn down on the first stream disconnect. While the window is open the session stays `Ready` and fully usable — worker commands continue to work and a reconnecting subscriber re-attaches normally. Because retention is keyed on the *external* subscriber count (`_activeEventSubscriberCount`), and the gateway-owned internal dashboard mirror registers directly on the distributor with `isInternal: true` and is therefore *not* counted, a session whose only remaining subscriber is the dashboard mirror still enters detach-grace.
Mechanically: when the last external subscriber detaches and `DetachGraceSeconds > 0`, `DetachEventSubscriber` stamps `DetachedAtUtc` from the session's `TimeProvider` under `_syncRoot` (the detach→grace-start transition). `AttachEventSubscriber` clears `DetachedAtUtc` under the same lock when a subscriber re-attaches (the reattach→grace-cancel transition), so the two races and the sweeper's read all serialize on `_syncRoot`. `SessionManager.CloseExpiredLeasesAsync` checks `IsDetachGraceExpired(now)` alongside `IsLeaseExpired(now)`: a session detached for at least `DetachGraceSeconds` with no active external subscriber is closed by the same lease sweep, with the distinct `DetachGraceExpiredReason` (`"detach-grace-expired"`) so operators can tell a short reconnect-window expiry from a long idle-lease expiry. Setting `DetachGraceSeconds` to `0` disables retention and reverts to the original behavior: a detached session is retained only until its normal lease expires.
`DetachGraceSeconds` controls retention and expiry only; the reconnect/replay path that re-attaches a dropped client to a retained session is described in [Reconnect and replay](#reconnect-and-replay).
#### Reconnect and replay
A client that drops mid-stream reconnects by re-issuing `StreamEvents` with `StreamEventsRequest.after_worker_sequence` set to the last `worker_sequence` it observed. A non-zero `after_worker_sequence` means *resume*; `0` means *fresh stream* and behaves exactly as a first-time subscribe — no replay, no sentinel.
On a resume, `EventStreamService.StreamEventsAsync` attaches through `GatewaySession.AttachEventSubscriberWithReplay`, which calls `SessionEventDistributor.RegisterWithReplay`. That method snapshots the session's replay ring for events newer than `after_worker_sequence` **and** registers the live subscriber inside a single `_replayLock` critical section. This atomicity is what makes the replay→live handoff free of gaps and duplicates: the pump appends each event to the replay ring (under `_replayLock`) before fanning it to subscriber channels, so relative to that one critical section every event is either in the replay snapshot or fanned into the freshly-registered live channel — never both observably, never neither.
The handoff is sealed by a watermark. `RegisterWithReplay` returns `LiveResumeSequence` (the highest replayed sequence, or `after_worker_sequence` when nothing was replayed); `EventStreamService` then filters the live channel to events strictly greater than that watermark. An event that was both included in the replay snapshot and — racing the registration — also written to the live channel has `worker_sequence <= LiveResumeSequence`, so the live filter drops it exactly once (no duplicate), while every newer event is delivered (no gap). The same per-item filter governs replayed and live events identically, so a constrained or resuming caller never sees a replayed event it could not have seen live.
Emit order on a resumed stream:
1. **ReplayGap sentinel (only when events were evicted).** If the requested `after_worker_sequence` predates the oldest event still retained — i.e. events in the open interval were dropped by capacity or age eviction and are unrecoverable — the gateway first yields a single sentinel `MxEvent` with `replay_gap` populated (`requested_after_sequence` = the requested watermark, `oldest_available_sequence` = the oldest still-retained sequence). The sentinel carries the session id; its `family` is `UNSPECIFIED`, its `body` oneof is unset, and no per-item fields are populated. It is an explicit, documented control signal — *not* a synthesized MXAccess event — telling the client to discard local state and re-snapshot. A client that wants to resume without another gap should set `after_worker_sequence = oldest_available_sequence - 1` on its next request.
2. **Retained replay batch.** The still-retained events newer than the requested watermark, in ascending `worker_sequence` order.
3. **Live events**, resuming strictly after `LiveResumeSequence`.
When `after_worker_sequence` is inside the retained window (nothing was evicted), step 1 is skipped: the stream replays the retained tail then resumes live with no sentinel.
The ReplayGap sentinel is emitted **only** on the `StreamEvents` server stream and only to the resuming subscriber — it is never fanned to other subscribers and never appears in `DrainEventsReply` (the diagnostic drain path is untouched). Replay retention itself is bounded by `MxGateway:Events:ReplayBufferCapacity` (count) and `ReplayRetentionSeconds` (age); see [Configuration](GatewayConfiguration.md).
### Close
`GatewaySession.CloseAsync` is serialized by a per-session `SemaphoreSlim` (`_closeLock`) so only one close runs at a time, but every read/write of `_state` still passes through `_syncRoot` (via `TryBeginClose` and `MarkClosed`). The close path therefore obeys the same lock discipline as `TransitionTo` / `MarkFaulted`: it transitions to `Closing`, asks the worker client to shut down within `ShutdownTimeout`, and on success transitions to `Closed`. `DisposeAsync` waits on `_closeLock` once before disposing the semaphore so an in-flight close's `Release()` cannot race against the dispose. If `WorkerClient.ShutdownAsync` throws, the session falls back to `IWorkerClient.Kill` (forced close):
@@ -1,18 +1,18 @@
{
"planPath": "docs/plans/2026-06-15-session-resilience.md",
"tasks": [
{"id": 108, "subject": "Task 1: Add OwnerKeyId to the session", "status": "pending"},
{"id": 109, "subject": "Task 2: SessionEventDistributor skeleton", "status": "pending", "blockedBy": [108]},
{"id": 110, "subject": "Task 3: Bounded replay ring buffer", "status": "pending", "blockedBy": [109]},
{"id": 111, "subject": "Task 4: Rewire AttachEventSubscriber + EventStreamService onto distributor", "status": "pending", "blockedBy": [110]},
{"id": 112, "subject": "Task 5: Per-subscriber backpressure isolation", "status": "pending", "blockedBy": [111]},
{"id": 113, "subject": "Task 6: Dashboard broadcaster becomes a distributor subscriber", "status": "pending", "blockedBy": [111]},
{"id": 114, "subject": "Task 7: Remove validator block + add subscriber cap option", "status": "pending", "blockedBy": [112]},
{"id": 115, "subject": "Task 8: Subscriber-lease collection + cap enforcement", "status": "pending", "blockedBy": [114]},
{"id": 116, "subject": "Task 9: Multi-subscriber end-to-end test (FakeWorkerHarness)", "status": "pending", "blockedBy": [115]},
{"id": 117, "subject": "Task 10: Proto - ReplayGap signal", "status": "pending", "blockedBy": [116]},
{"id": 118, "subject": "Task 11: Detach-grace session retention", "status": "pending", "blockedBy": [117]},
{"id": 119, "subject": "Task 12: Replay-on-reconnect + emit ReplayGap", "status": "pending", "blockedBy": [118, 110]},
{"id": 108, "subject": "Task 1: Add OwnerKeyId to the session", "status": "completed"},
{"id": 109, "subject": "Task 2: SessionEventDistributor skeleton", "status": "completed", "blockedBy": [108]},
{"id": 110, "subject": "Task 3: Bounded replay ring buffer", "status": "completed", "blockedBy": [109]},
{"id": 111, "subject": "Task 4: Rewire AttachEventSubscriber + EventStreamService onto distributor", "status": "completed", "blockedBy": [110]},
{"id": 112, "subject": "Task 5: Per-subscriber backpressure isolation", "status": "completed", "blockedBy": [111]},
{"id": 113, "subject": "Task 6: Dashboard broadcaster becomes a distributor subscriber", "status": "completed", "blockedBy": [111]},
{"id": 114, "subject": "Task 7: Remove validator block + add subscriber cap option", "status": "completed", "blockedBy": [112]},
{"id": 115, "subject": "Task 8: Subscriber-lease collection + cap enforcement", "status": "completed", "blockedBy": [114]},
{"id": 116, "subject": "Task 9: Multi-subscriber end-to-end test (FakeWorkerHarness)", "status": "completed", "blockedBy": [115]},
{"id": 117, "subject": "Task 10: Proto - ReplayGap signal", "status": "completed", "blockedBy": [116]},
{"id": 118, "subject": "Task 11: Detach-grace session retention", "status": "completed", "blockedBy": [117]},
{"id": 119, "subject": "Task 12: Replay-on-reconnect + emit ReplayGap", "status": "completed", "blockedBy": [118, 110]},
{"id": 120, "subject": "Task 13: Owner re-validation on reconnect", "status": "pending", "blockedBy": [119, 108]},
{"id": 121, "subject": "Task 14: Client ReplayGap handling - all 5 clients", "status": "pending", "blockedBy": [117]},
{"id": 122, "subject": "Task 15: Reconnect integration test (fake worker)", "status": "pending", "blockedBy": [119]},
@@ -0,0 +1,141 @@
# Dashboard "Disable Login" Dev Flag — Design
**Date:** 2026-06-16
**Status:** Approved (brainstorming) — ready for implementation plan.
## Goal
A config flag that **disables login in the gateway dashboard**. When enabled, every
request is auto-authenticated as a fixed dev user (default **`multi-role`**) holding
**both** dashboard roles (`Administrator` + `Viewer`), so no login form, cookie, or LDAP
bind is involved and the whole UI behaves as a signed-in multi-role admin. Default
**off**. Mirrors the sister project OtOpcUa's `Security:Auth:DisableLogin` feature.
## Why / scope
Speeds up dashboard testing against the remote dev boxes (10.100.0.48, wonder) with no
sign-in round-trip and no GLAuth dependency. Scope is the **dashboard cookie web surface
only** — the gRPC API-key auth path (`authorization: Bearer mxgw_…`) and its scopes are a
separate auth model and are **untouched**.
## Background (current dashboard auth, verified)
- Dashboard auth is a **single cookie scheme** `MxGateway.Dashboard` registered in
`Dashboard/DashboardServiceCollectionExtensions.cs::AddGatewayDashboard`
(`AddAuthentication("MxGateway.Dashboard").AddCookie(...)`), plus a bearer scheme
`MxGateway.Dashboard.HubToken` (`HubTokenAuthenticationHandler`) for SignalR hubs.
- Real login: `/login``DashboardAuthenticator.AuthenticateAsync` → shared
`ILdapAuthService` bind/search → `IGroupRoleMapper<string>``CreatePrincipal` builds a
`ClaimsPrincipal` (`ZbClaimTypes.Name`/`Username`/`DisplayName` + one `ZbClaimTypes.Role`
per role + `LdapGroupClaimType` group claims; identity authType = the cookie scheme,
nameType = `ZbClaimTypes.Name`, roleType = `ZbClaimTypes.Role`) → cookie sign-in.
- Authorization: a custom `DashboardAuthorizationHandler` evaluates
`DashboardAuthorizationRequirement`. Policies: `ViewerPolicy` (AnyDashboardRole),
`AdminPolicy` (AdminOnly), `HubClientsPolicy` (cookie **or** hub-token scheme,
AnyDashboardRole).
- Roles: exactly two — `DashboardRoles.Admin` (`"Administrator"`) and
`DashboardRoles.Viewer` (`"Viewer"`).
- **Existing escapes (important):** `DashboardAuthorizationHandler` already short-circuits
when `Authentication.Mode == Disabled` or when `Dashboard.AllowAnonymousLocalhost`
(default **true**) and the request is loopback. **But both only `context.Succeed(...)`
the authorization requirement — they do not mint an authenticated principal.** So
`HttpContext.User.Identity.IsAuthenticated` stays false, `Identity.Name` is null, and
role-gated `AuthorizeView` write affordances stay **hidden**. That is precisely why they
do not deliver the "logged-in multi-role admin" experience this feature needs.
## Approach (chosen: always-authenticating handler under the cookie scheme name)
When the flag is **on**, **replace the `.AddCookie(...)` registration with a custom
`AuthenticationHandler` registered under the *same* scheme name**
(`DashboardAuthenticationDefaults.AuthenticationScheme` = `"MxGateway.Dashboard"`). Its
`HandleAuthenticateAsync` **always returns `AuthenticateResult.Success`** with the fixed
dev principal (configured username, both roles), shaped identically to what
`DashboardAuthenticator.CreatePrincipal` produces. `UseAuthentication()` stamps that
principal on `HttpContext.User` for **every** request.
Registering under the cookie scheme name (not a new name) is the load-bearing detail: the
`ViewerPolicy`, `AdminPolicy`, and `HubClientsPolicy` all resolve through that scheme via
`DashboardAuthorizationHandler`'s role check, so they pass with **no policy or page
changes**. The HTTP pipeline (Razor pages, admin endpoints), the Blazor circuit
(`AuthorizeView`, `[CascadingParameter] AuthenticationState`), and the SignalR hubs are
all covered by the single `HttpContext.User` seam. Because the handler authenticates every
request, the feature is inherently **global** (all clients, including remote browsers) —
the agreed scope.
`SignInAsync`/`SignOutAsync` are no-ops (no cookie to write or clear; the next request
re-authenticates through the handler).
**Alternatives rejected:** (2) mint the principal inside the existing
`DashboardAuthorizationHandler` bypass branches — authorization runs after authentication,
so `HttpContext.User` is set too late for the Blazor auth state, and two seams must agree
(this is essentially today's half-feature); (3) a pipeline middleware plus a stubbed
`AuthenticationStateProvider` — two components to keep in sync, and a page request still
302s to `/login` unless `HttpContext.User` is also set.
## Components
### 1. Config surface — two new fields on `DashboardOptions` (`MxGateway:Dashboard:*`)
- `DisableLogin` (bool, default **false**).
- `AutoLoginUser` (string, default **`"multi-role"`** — this project's GLAuth
Administrator test user). Used as `Name`/`Username`/`DisplayName` of the minted
principal; blank falls back to `"multi-role"`.
"All permissions" = principal minted with **both** `DashboardRoles.Admin` and
`DashboardRoles.Viewer`.
### 2. `DashboardAutoLoginAuthenticationHandler`
`AuthenticationHandler<AuthenticationSchemeOptions>` implementing
`IAuthenticationSignInHandler`. Mirrors OtOpcUa's `AutoLoginAuthenticationHandler`, adapted
to this project's claim shape (`ZbClaimTypes.*`, `DashboardRoles.*`). Always `Success`;
SignIn/SignOut no-ops.
### 3. Wiring in `AddGatewayDashboard`
Read `MxGateway:Dashboard:DisableLogin` directly from `IConfiguration` at registration
time (the same idiom OtOpcUa uses, since scheme registration precedes options binding).
- On → `AddScheme<AuthenticationSchemeOptions, DashboardAutoLoginAuthenticationHandler>(
"MxGateway.Dashboard", _ => {})` in place of `AddCookie`; the `HubToken` scheme stays
registered unchanged.
- Off → existing `AddCookie(...)` path unchanged.
### 4. Safety
- Default **off**.
- A **loud one-time startup `LogWarning`** ("DASHBOARD LOGIN DISABLED
(MxGateway:Dashboard:DisableLogin=true) — every request authenticated as '{user}' with
full permissions (Administrator, Viewer). Dev/test only; never enable in production.")
via the same options `PostConfigure<ILoggerFactory>` idiom OtOpcUa uses.
- The existing `AllowAnonymousLocalhost` / `Authentication.Mode == Disabled` escapes are
left untouched — `DisableLogin` is orthogonal (it changes *authentication*, minting a
principal, not authorization bypass); when it is on the authorization handler's normal
role-check branch succeeds, so the bypass branches simply do not matter.
## Error handling / edge cases
- Blank `AutoLoginUser` → falls back to `"multi-role"` (handler never mints a nameless
principal).
- `/login` still renders when the flag is on but is pointless (the user is already
authenticated); `POST /login`'s `SignInAsync` is a no-op. `/logout` is likewise a no-op.
No redirect added (YAGNI).
- No interaction with the gRPC API-key path — that auth is entirely separate.
## Testing
- **Handler unit test:** `HandleAuthenticateAsync` returns `Success`;
`principal.Identity.IsAuthenticated`, `Identity.Name == AutoLoginUser`,
`IsInRole("Administrator")` && `IsInRole("Viewer")`; blank-user fallback.
- **Wiring / integration (`WebApplicationFactory`):** with `DisableLogin=true`, an
`AdminPolicy`-gated endpoint returns 200 with **no** cookie, and a `/hubs/*` negotiate
authorizes; the startup warning is emitted.
- **Regression:** with the flag off (default), the real cookie handler is still registered
and existing dashboard auth tests pass.
## Docs to update in the same change
- `docs/GatewayConfiguration.md` — new `MxGateway:Dashboard:DisableLogin` /
`AutoLoginUser` options.
- The dashboard design doc (`docs/GatewayDashboardDesign.md`).
- The CLAUDE.md dashboard-auth note (alongside the `AllowAnonymousLocalhost` mention).
## Scope / verification
Gateway-server-side only (.NET 10, x64) — builds and tests entirely on macOS. No worker,
no `.proto`, no client, no gRPC changes.
@@ -0,0 +1,447 @@
# Dashboard "Disable Login" Dev Flag — Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
**Goal:** Add a `MxGateway:Dashboard:DisableLogin` config flag that, when on, auto-authenticates every dashboard request as a fixed dev user (default `multi-role`) holding both dashboard roles — no login form, cookie, or LDAP bind.
**Architecture:** When the flag is on, the dashboard's `AddCookie(...)` registration is replaced by a custom `AuthenticationHandler` registered **under the same scheme name** (`MxGateway.Dashboard`) whose `HandleAuthenticateAsync` always succeeds with a multi-role principal. `UseAuthentication()` stamps that principal on `HttpContext.User` for every request, so every policy (Viewer/Admin/HubClients), the Blazor circuit, and the SignalR hubs see a signed-in admin with **zero policy or page changes**. Mirrors the sister project OtOpcUa's `Security:Auth:DisableLogin`.
**Tech Stack:** .NET 10 (x64) gateway server; ASP.NET Core authentication/authorization; xUnit. Server-side only — no worker, no `.proto`, no clients, no gRPC API-key changes. Builds and tests entirely on macOS.
**Design doc:** `docs/plans/2026-06-16-dashboard-disable-login-design.md`
**Key existing files (verified):**
- `src/ZB.MOM.WW.MxGateway.Server/Configuration/DashboardOptions.cs` — options bound from `MxGateway:Dashboard`.
- `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs::AddGatewayDashboard` — auth scheme + policy wiring.
- `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticationDefaults.cs` — scheme/policy name constants (`AuthenticationScheme = "MxGateway.Dashboard"`, `AdminPolicy`, `ViewerPolicy`).
- `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardRoles.cs``Admin = "Administrator"`, `Viewer = "Viewer"`.
- `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs::CreatePrincipal` — the claim shape to mirror (`ZbClaimTypes.Name/Username/DisplayName` + `ZbClaimTypes.Role` per role; identity authType = scheme, nameType = `ZbClaimTypes.Name`, roleType = `ZbClaimTypes.Role`).
- `ZbClaimTypes` (from `ZB.MOM.WW.Auth.AspNetCore`): `Name` (= `ClaimTypes.Name`), `Role` (= `ClaimTypes.Role`), `Username` (`"zb:username"`), `DisplayName` (`"zb:displayname"`).
- `src/ZB.MOM.WW.MxGateway.Server/Properties/AssemblyInfo.cs``InternalsVisibleTo("ZB.MOM.WW.MxGateway.Tests")` (so `internal` members are test-visible).
**Test conventions (verified):** no Moq/NSubstitute — hand-written stubs only. Integration-style tests build the real app with `GatewayApplication.Build(["--MxGateway:Dashboard:Key=value"])` and resolve services from `app.Services` (see `DashboardCookieOptionsTests`, `DashboardHubsRegistrationTests`). Run filtered tests only (per standing guidance), with `MSBUILDDISABLENODEREUSE=1`.
---
### Task 1: Config fields on `DashboardOptions`
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** none (Tasks 2/3 depend on these fields)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/DashboardOptions.cs`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsTests.cs`
**Step 1: Write the failing test** — add to `GatewayOptionsTests.cs`:
```csharp
[Fact]
public void DashboardOptions_DisableLogin_DefaultsToFalse()
{
Assert.False(new DashboardOptions().DisableLogin);
}
[Fact]
public void DashboardOptions_AutoLoginUser_DefaultsToNull()
{
Assert.Null(new DashboardOptions().AutoLoginUser);
}
```
(If `GatewayOptionsTests` lacks `using ZB.MOM.WW.MxGateway.Server.Configuration;`, add it.)
**Step 2: Run it, expect FAIL** (compile error: no such members)
Run: `MSBUILDDISABLENODEREUSE=1 dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj --filter "FullyQualifiedName~GatewayOptionsTests.DashboardOptions"`
**Step 3: Add the two `init` properties** to `DashboardOptions.cs` (place near `AllowAnonymousLocalhost`):
```csharp
/// <summary>
/// DEV/TEST ONLY. When true, the dashboard bypasses the login form entirely and
/// auto-authenticates EVERY request as <see cref="AutoLoginUser"/> holding both
/// dashboard roles (Administrator + Viewer). No cookie, no LDAP bind. Default false.
/// Unlike <see cref="AllowAnonymousLocalhost"/> (which only succeeds the authorization
/// requirement without authenticating), this mints a real principal, so the UI behaves
/// as a signed-in admin and applies to all clients (not just loopback). Never enable in
/// production. See docs/plans/2026-06-16-dashboard-disable-login-design.md.
/// </summary>
public bool DisableLogin { get; init; }
/// <summary>
/// Username minted for the auto-login principal when <see cref="DisableLogin"/> is true.
/// Null/blank falls back to the GLAuth Administrator test user <c>multi-role</c>.
/// </summary>
public string? AutoLoginUser { get; init; }
```
**Step 4: Run the test, expect PASS.**
**Step 5: Commit**
```bash
git add src/ZB.MOM.WW.MxGateway.Server/Configuration/DashboardOptions.cs src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsTests.cs
git commit -m "feat(dashboard): add DisableLogin + AutoLoginUser options (default off)"
```
---
### Task 2: `DashboardAutoLoginAuthenticationHandler` + unit tests
**Classification:** high-risk (security/auth code)
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 4
**Files:**
- Create: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAutoLoginAuthenticationHandler.cs`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardAutoLoginAuthenticationHandlerTests.cs`
**Step 1: Write the failing test** (`DashboardAutoLoginAuthenticationHandlerTests.cs`):
```csharp
using System.Security.Claims;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardAutoLoginAuthenticationHandlerTests
{
[Fact]
public void CreatePrincipal_MintsAuthenticatedMultiRoleUser()
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal("multi-role");
Assert.True(principal.Identity!.IsAuthenticated);
Assert.Equal("multi-role", principal.Identity!.Name);
Assert.True(principal.IsInRole(DashboardRoles.Admin));
Assert.True(principal.IsInRole(DashboardRoles.Viewer));
}
[Theory]
[InlineData(null)]
[InlineData("")]
[InlineData(" ")]
public void CreatePrincipal_BlankUser_FallsBackToDefault(string? user)
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal(user);
Assert.Equal(DashboardAutoLoginAuthenticationHandler.DefaultUser, principal.Identity!.Name);
}
[Fact]
public void CreatePrincipal_TrimsUser()
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal(" multi-role ");
Assert.Equal("multi-role", principal.Identity!.Name);
}
}
```
**Step 2: Run it, expect FAIL** (type does not exist).
Run: `MSBUILDDISABLENODEREUSE=1 dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj --filter "FullyQualifiedName~DashboardAutoLoginAuthenticationHandlerTests"`
**Step 3: Implement** `DashboardAutoLoginAuthenticationHandler.cs`:
```csharp
using System.Security.Claims;
using System.Text.Encodings.Web;
using Microsoft.AspNetCore.Authentication;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Auth.AspNetCore;
using ZB.MOM.WW.MxGateway.Server.Configuration;
namespace ZB.MOM.WW.MxGateway.Server.Dashboard;
/// <summary>
/// Authentication handler used ONLY when <c>MxGateway:Dashboard:DisableLogin</c> is true.
/// Registered under the dashboard cookie scheme name
/// (<see cref="DashboardAuthenticationDefaults.AuthenticationScheme"/>), it authenticates
/// EVERY request as the configured dev user with both dashboard roles — no credential check,
/// no cookie, no LDAP bind. The minted principal mirrors the shape the real login
/// (<see cref="DashboardAuthenticator"/>) produces, so policies and the UI cannot tell it
/// apart. DEV/TEST ONLY; never enable in production.
/// </summary>
public sealed class DashboardAutoLoginAuthenticationHandler
: AuthenticationHandler<AuthenticationSchemeOptions>, IAuthenticationSignInHandler
{
/// <summary>Username used when <c>AutoLoginUser</c> is null or blank.</summary>
public const string DefaultUser = "multi-role";
private readonly string _user;
/// <summary>Initializes the handler with scheme plumbing and the dashboard options.</summary>
/// <param name="options">The per-scheme authentication options monitor.</param>
/// <param name="logger">The logger factory the base handler uses.</param>
/// <param name="encoder">The URL encoder the base handler uses.</param>
/// <param name="gatewayOptions">Gateway options carrying the dashboard auto-login user.</param>
public DashboardAutoLoginAuthenticationHandler(
IOptionsMonitor<AuthenticationSchemeOptions> options,
ILoggerFactory logger,
UrlEncoder encoder,
IOptions<GatewayOptions> gatewayOptions)
: base(options, logger, encoder)
=> _user = gatewayOptions.Value.Dashboard.AutoLoginUser ?? DefaultUser;
/// <summary>No-op: auto-login writes no cookie, so a sign-in has nothing to persist.</summary>
/// <param name="user">Ignored.</param>
/// <param name="properties">Ignored.</param>
/// <returns>A completed task.</returns>
public Task SignInAsync(ClaimsPrincipal user, AuthenticationProperties? properties) => Task.CompletedTask;
/// <summary>No-op: there is no auth cookie to clear; the next request re-authenticates.</summary>
/// <param name="properties">Ignored.</param>
/// <returns>A completed task.</returns>
public Task SignOutAsync(AuthenticationProperties? properties) => Task.CompletedTask;
/// <inheritdoc />
protected override Task<AuthenticateResult> HandleAuthenticateAsync()
{
ClaimsPrincipal principal = CreatePrincipal(_user);
AuthenticationTicket ticket = new(principal, Scheme.Name);
return Task.FromResult(AuthenticateResult.Success(ticket));
}
/// <summary>
/// Builds the multi-role dev principal. Null/blank <paramref name="user"/> falls back to
/// <see cref="DefaultUser"/>. Claim shape mirrors <see cref="DashboardAuthenticator"/>.
/// </summary>
/// <param name="user">The configured auto-login username (may be null/blank).</param>
/// <returns>An authenticated principal holding both dashboard roles.</returns>
internal static ClaimsPrincipal CreatePrincipal(string? user)
{
string name = string.IsNullOrWhiteSpace(user) ? DefaultUser : user.Trim();
Claim[] claims =
[
new Claim(ClaimTypes.NameIdentifier, name),
new Claim(ZbClaimTypes.Username, name),
new Claim(ZbClaimTypes.Name, name),
new Claim(ZbClaimTypes.DisplayName, name),
new Claim(ZbClaimTypes.Role, DashboardRoles.Admin),
new Claim(ZbClaimTypes.Role, DashboardRoles.Viewer),
];
ClaimsIdentity identity = new(
claims,
DashboardAuthenticationDefaults.AuthenticationScheme,
ZbClaimTypes.Name,
ZbClaimTypes.Role);
return new ClaimsPrincipal(identity);
}
}
```
**Step 4: Run the test, expect PASS.**
**Step 5: Commit**
```bash
git add src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAutoLoginAuthenticationHandler.cs src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardAutoLoginAuthenticationHandlerTests.cs
git commit -m "feat(dashboard): add auto-login auth handler for DisableLogin mode"
```
---
### Task 3: Wire the scheme swap + startup warning + wiring/authorization tests
**Classification:** high-risk (security wiring)
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 2's handler)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardDisableLoginTests.cs` (create)
**Step 1: Write the failing tests** (`DashboardDisableLoginTests.cs`):
```csharp
using System.Security.Claims;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.MxGateway.Server;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardDisableLoginTests
{
[Fact]
public async Task DisableLoginOff_CookieSchemeUsesCookieHandler()
{
await using WebApplication app = GatewayApplication.Build([]);
IAuthenticationSchemeProvider provider =
app.Services.GetRequiredService<IAuthenticationSchemeProvider>();
AuthenticationScheme? scheme = await provider.GetSchemeAsync(
DashboardAuthenticationDefaults.AuthenticationScheme);
Assert.NotNull(scheme);
Assert.Equal(typeof(CookieAuthenticationHandler), scheme!.HandlerType);
}
[Fact]
public async Task DisableLoginOn_CookieSchemeUsesAutoLoginHandler()
{
await using WebApplication app = GatewayApplication.Build(
["--MxGateway:Dashboard:DisableLogin=true"]);
IAuthenticationSchemeProvider provider =
app.Services.GetRequiredService<IAuthenticationSchemeProvider>();
AuthenticationScheme? scheme = await provider.GetSchemeAsync(
DashboardAuthenticationDefaults.AuthenticationScheme);
Assert.NotNull(scheme);
Assert.Equal(typeof(DashboardAutoLoginAuthenticationHandler), scheme!.HandlerType);
}
[Fact]
public async Task DisableLoginOn_AutoLoginPrincipalSatisfiesAdminAndViewerPolicies()
{
await using WebApplication app = GatewayApplication.Build(
["--MxGateway:Dashboard:DisableLogin=true"]);
IAuthorizationService authorization =
app.Services.GetRequiredService<IAuthorizationService>();
ClaimsPrincipal user = DashboardAutoLoginAuthenticationHandler.CreatePrincipal("multi-role");
Assert.True((await authorization.AuthorizeAsync(
user, resource: null, DashboardAuthenticationDefaults.AdminPolicy)).Succeeded);
Assert.True((await authorization.AuthorizeAsync(
user, resource: null, DashboardAuthenticationDefaults.ViewerPolicy)).Succeeded);
}
}
```
> Note: `AuthorizeAsync` invokes the real `DashboardAuthorizationHandler` against the minted
> principal — its role-check branch succeeds independent of `HttpContext` (loopback check
> returns false with no request, and `Authentication.Mode` defaults to `ApiKey`), so this
> proves the policies pass purely on the minted roles.
**Step 2: Run them, expect FAIL** (the `DisableLoginOn_*` tests fail — handler not yet wired; cookie handler still registered).
Run: `MSBUILDDISABLENODEREUSE=1 dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj --filter "FullyQualifiedName~DashboardDisableLoginTests"`
**Step 3: Rewire `AddGatewayDashboard`.** In `DashboardServiceCollectionExtensions.cs`, replace the current authentication-builder block:
```csharp
services
.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme)
.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme, cookieOptions =>
{
// ... existing cookie config ...
})
.AddScheme<AuthenticationSchemeOptions, HubTokenAuthenticationHandler>(
DashboardAuthenticationDefaults.HubAuthenticationScheme,
_ => { });
```
with:
```csharp
// DEV/TEST ONLY. Read directly from configuration here because authentication scheme
// registration runs before options binding. Key mirrors DashboardOptions.DisableLogin.
bool disableLogin = configuration.GetValue<bool>("MxGateway:Dashboard:DisableLogin");
AuthenticationBuilder authentication =
services.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme);
if (disableLogin)
{
// Register an always-authenticating handler UNDER the cookie scheme name, so the
// Viewer/Admin/HubClients policies (which all resolve this scheme) authenticate
// through it as the multi-role dev user — zero policy or page changes.
authentication.AddScheme<AuthenticationSchemeOptions, DashboardAutoLoginAuthenticationHandler>(
DashboardAuthenticationDefaults.AuthenticationScheme,
_ => { });
// Loud, once-at-startup warning (emitted when GatewayOptions is first resolved).
services.AddOptions<GatewayOptions>().PostConfigure<ILoggerFactory>((gatewayOptions, loggerFactory) =>
loggerFactory
.CreateLogger("ZB.MOM.WW.MxGateway.Server.Dashboard.DisableLogin")
.LogWarning(
"DASHBOARD LOGIN DISABLED (MxGateway:Dashboard:DisableLogin=true) — every request is "
+ "authenticated as '{User}' with full permissions ({Roles}). Dev/test only; never "
+ "enable in production.",
gatewayOptions.Dashboard.AutoLoginUser ?? DashboardAutoLoginAuthenticationHandler.DefaultUser,
$"{DashboardRoles.Admin}, {DashboardRoles.Viewer}"));
}
else
{
authentication.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme, cookieOptions =>
{
// ... MOVE the existing cookie config body here unchanged ...
});
}
authentication.AddScheme<AuthenticationSchemeOptions, HubTokenAuthenticationHandler>(
DashboardAuthenticationDefaults.HubAuthenticationScheme,
_ => { });
```
Notes for the implementer:
- Keep the existing `services.AddOptions<CookieAuthenticationOptions>(scheme).Configure(...)` block (RequireHttpsCookie / cookie-name) as-is. When `disableLogin` is on it configures an options object no handler reads — harmless dead config; not worth guarding.
- Required usings should already be present (`Microsoft.AspNetCore.Authentication`, `Microsoft.Extensions.Configuration`, `Microsoft.Extensions.Logging`, the `Configuration` namespace for `GatewayOptions`). Add any that are missing.
- `configuration.GetValue<bool>` defaults to `false` when the key is absent — preserves default-off.
**Step 4: Run the tests, expect PASS** (all three).
**Step 5: Run the broader dashboard auth tests to confirm no regression:**
Run: `MSBUILDDISABLENODEREUSE=1 dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj --filter "FullyQualifiedName~Dashboard"`
Expected: all pass (existing `DashboardCookieOptionsTests`, `DashboardHubsRegistrationTests`, etc., still green — they build with the flag off).
> The startup warning is verified by inspection / manual run (`dotnet run … --MxGateway:Dashboard:DisableLogin=true` logs the warning once). It is not asserted automatically — capturing a startup log line would require injecting a log provider the `Build` harness does not expose, and the warning is a safety nicety, not core behavior.
**Step 6: Commit**
```bash
git add src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardDisableLoginTests.cs
git commit -m "feat(dashboard): swap to auto-login handler when DisableLogin is set"
```
---
### Task 4: Documentation
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** Task 2 (disjoint files — docs vs src/test)
**Files:**
- Modify: `docs/GatewayConfiguration.md`
- Modify: `docs/GatewayDashboardDesign.md`
- Modify: `CLAUDE.md`
**Step 1:** In `docs/GatewayConfiguration.md`, add `MxGateway:Dashboard:DisableLogin` (bool, default `false`) and `MxGateway:Dashboard:AutoLoginUser` (string, default `multi-role`) to the dashboard options section. Describe: dev/test only; auto-authenticates every request as `AutoLoginUser` with both roles; applies to all clients (not just loopback); never enable in production. Note it differs from `AllowAnonymousLocalhost` (which only bypasses authorization without minting a principal).
**Step 2:** In `docs/GatewayDashboardDesign.md`, document the auth-scheme swap: when the flag is on, the cookie handler is replaced by `DashboardAutoLoginAuthenticationHandler` under the same scheme name; explain *why* (every policy resolves that scheme, so no policy/page changes), and that it is dev/test only with a loud startup warning.
**Step 3:** In `CLAUDE.md`, in the Authentication section near the `Dashboard:AllowAnonymousLocalhost` sentence, add one sentence: `MxGateway:Dashboard:DisableLogin` (default off) auto-authenticates every dashboard request as `AutoLoginUser` (default `multi-role`) with all roles — dev/test only.
**Step 4: Commit**
```bash
git add docs/GatewayConfiguration.md docs/GatewayDashboardDesign.md CLAUDE.md
git commit -m "docs: document dashboard DisableLogin / AutoLoginUser dev flag"
```
---
## Verification (after all tasks)
```bash
MSBUILDDISABLENODEREUSE=1 dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj \
--filter "FullyQualifiedName~Dashboard|FullyQualifiedName~GatewayOptions"
```
Expected: all dashboard + options tests pass. (Known macOS-only failures `OrphanWorkerTerminatorTests` ×2 and the parallel-load `SqliteAuthStoreTests` TLS temp-file test are unrelated and out of this filter.)
Then `superpowers-extended-cc:finishing-a-development-branch` to merge/push.
@@ -0,0 +1,10 @@
{
"planPath": "docs/plans/2026-06-16-dashboard-disable-login.md",
"tasks": [
{"id": 136, "subject": "Task 1: Config fields on DashboardOptions", "status": "completed"},
{"id": 137, "subject": "Task 2: DashboardAutoLoginAuthenticationHandler + unit tests", "status": "completed", "blockedBy": [136]},
{"id": 138, "subject": "Task 3: Wire scheme swap + startup warning + wiring/authorization tests", "status": "completed", "blockedBy": [137]},
{"id": 139, "subject": "Task 4: Documentation", "status": "completed", "blockedBy": [136]}
],
"lastUpdated": "2026-06-16"
}
@@ -0,0 +1,171 @@
# Still-Pending §8 Completion — Design
> **Status:** Approved 2026-06-16. Next step: `superpowers-extended-cc:writing-plans`.
**Goal:** Close the actionable items in `stillpending.md` §8 ("Deferred test-coverage
follow-ups, never filed as findings") — the only Bucket-A work that is neither
vendor-gated nor live-rig-gated and is not already covered by the session-resilience
epic plan.
**Scope decision:** Bucket A only (actionable code/test work). The session-resilience
epic (Tasks 1328) is already planned in `docs/plans/2026-06-15-session-resilience.md`
and is explicitly **out of scope** here — resume it separately. Vendor-gated
(§1.4/§3.4/§3.5) and live-rig/capture-gated (§1.3/§3.x/§5/§6.1) items cannot be
completed from this dev box and are out of scope.
**Approach:** "C" — the complete option, including new in-process gRPC test
infrastructure for the Java streaming/galaxy CLI commands and a full bounded
ready-wait in the gateway session hot path.
---
## Important correction (verified 2026-06-16)
The three §8 items cite findings marked **Resolved** in the review backlog, but those
resolutions did **not** survive into the current tree:
- The Java bulk-family CLI tests that `Client.Java-026` (resolved 2026-05-20) describes
were written against the old `com.dohertylan.mxgateway` package. After the rename to
`com.zb.mom.ww`, the current
`clients/java/zb-mom-ww-mxgateway-cli/.../MxGatewayCliTests.java` has **zero** coverage
for `read-bulk`, `write-bulk`, `write2-bulk`, `write-secured-bulk`,
`write-secured2-bulk`, `bench-read-bulk`, `stream-events`, `close-session`,
`galaxy-discover`, `galaxy-watch`. (`galaxy-test-connection`/`galaxy-last-deploy`/
`galaxy-browse`/`stream-alarms` **do** have tests now.)
- `Server-030` (both states in the not-ready diagnostic) **is** done — confirmed at
`src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs:1676`. The *deferred
follow-up* — should the gateway briefly wait for worker-Ready before failing fast? —
is genuinely unbuilt.
- `Tests-023` extracted a canonical `TestSupport/FakeWorkerProcess(int)`, yet three test
files still define private nested copies.
So §8's gap is real and current.
---
## Workstreams
Four independently landable workstreams.
| WS | Title | Files (language) | Classification | Depends on |
|----|-------|------------------|----------------|------------|
| A | Synchronous Java CLI tests (7 commands) | Java CLI test | small | — |
| B | In-process gRPC harness + streaming/galaxy CLI tests (3 commands) | Java CLI test + small CLI seam | standard | A (shares test file) |
| C | Worker-Ready bounded ready-wait | C# server session hot path | high-risk | — |
| D | `FakeWorkerProcess` consolidation | C# tests | small | — |
A, C, D are mutually independent (disjoint files/languages) and may be dispatched in
parallel. B follows A because both edit `MxGatewayCliTests.java`.
---
## WS-A — Synchronous Java CLI tests
**What:** Round-trip CLI tests for the 7 commands testable through the existing
`FakeSession`/`FakeClient` seam (the same seam `subscribe-bulk`/`write` already use):
`read-bulk`, `write-bulk`, `write2-bulk`, `write-secured-bulk`, `write-secured2-bulk`,
`bench-read-bulk`, `close-session`.
**How:** Upgrade `FakeSession` (currently returns empty lists) to per-call recorders
that capture the parsed entries (timeout, typed values via the shared `parseValue(type,
text)` switch, user-ids, timestamp) and synthesize one `BulkReadResult`/`BulkWriteResult`
per requested handle, so JSON-shape assertions exercise the
`bulkReadResultMap`/`bulkWriteResultMap` serializers. One `@Test` per command:
- `read-bulk`: `--timeout-ms` reaches session; JSON carries `tagAddress`/`itemHandle`/
`wasCached`/`quality`.
- `write-bulk`: `--type int32 --values 111,222 --user-id 5` parses through `parseValue`;
entries built with the expected typed `MxValue` + `userId`.
- `write2-bulk`: `--timestamp …Z` reaches the entry as `timestampValue`
(`hasTimestampValue()` true).
- `write-secured-bulk`: `--current-user-id`/`--verifier-user-id` both propagate.
- `write-secured2-bulk`: timestamp + both user-ids.
- `bench-read-bulk`: 1s steady / 0s warmup; assert cross-language schema keys
(`language=java`, `command=bench-read-bulk`, `totalCalls`, `successfulCalls`,
`failedCalls`, `callsPerSecond`, `latencyMs.p50/p95/p99`).
- `close-session`: `CloseSessionReply` round-trips through `FakeClient`.
**Verify:** `gradle :zb-mom-ww-mxgateway-cli:test --tests *MxGatewayCliTests`.
---
## WS-B — In-process gRPC harness + streaming/galaxy CLI tests
**Why infra is required:** `MxEventStream` and `DeployEventStream` have package-private
constructors; `GalaxyRepositoryClient` is `final` with a static `connect()` and
`GalaxyCommand` has **no** injectable factory. None of `stream-events`/`galaxy-watch`/
`galaxy-discover` can be faked through the `FakeSession` seam.
**What:** A JUnit fixture that starts a gRPC **`InProcessServer`** hosting scripted
`MxAccessGateway` + `GalaxyRepository` service implementations and exposes an in-process
`Channel`. The **real** `MxGatewayClient`/`GalaxyRepositoryClient` connect to it, so the
real `MxEventStream`/`DeployEventStream` queue-draining and `GalaxyRepositoryClient`
paging are exercised end-to-end (highest fidelity; no reflection, no package hacks).
- **Production change (CLI module only, not the library):** add a `GalaxyClientFactory`
seam to `GalaxyCommand` mirroring the existing `MxGatewayCliClientFactory`, so galaxy
commands can target the in-process channel.
- `stream-events`: server streams a scripted `MxEvent` sequence → assert CLI render,
including the unsigned-uint64 worker-sequence regression.
- `galaxy-watch`: server streams scripted deploy events → assert CLI feed output.
- `galaxy-discover`: server returns a paged `GalaxyObject` hierarchy → assert CLI JSON.
The 7 synchronous commands stay on the lightweight `FakeSession` seam (YAGNI — no reason
to route them through a server).
**Verify:** `gradle :zb-mom-ww-mxgateway-cli:test --tests *MxGatewayCliTests`.
---
## WS-C — Worker-Ready bounded ready-wait
**Problem:** `GetReadyWorkerClient` (`GatewaySession.cs:1665`) fails fast when the session
is `Ready` but the worker client's `WorkerClientState` has diverged (`Handshaking` after a
heartbeat blip, etc.). The both-states diagnostic exists; a brief wait does not.
**Constraint:** the check runs inside the `_syncRoot` lock — we cannot sleep/poll there.
**Design (pinned decisions):**
- New `GetReadyWorkerClientAsync`: read state under `_syncRoot`; **if** session is `Ready`
but worker is **transient** (`Handshaking`/`Created`), release the lock, poll at a short
interval (e.g. 25 ms) until the worker reaches `Ready` or a bounded timeout elapses, then
re-check under the lock.
- **Terminal worker states (`Faulted`/`Closing`/`Closed`/null) fail fast immediately**
never wait; retrying a faulted worker is pointless and would mask the fault.
- New config `MxGateway:Sessions:WorkerReadyWaitTimeout` on `GatewaySessionOptions`,
**default `0` = disabled** (preserves today's exact fail-fast behavior unless opted in),
validated `>= 0` by the options validator. Document in `docs/GatewayConfiguration.md`.
- The both-states diagnostic is preserved for the final failure. Callers at
`GatewaySession.cs:918` and `:1263` become `await`.
**Tests:**
- Handshaking→Ready within the timeout succeeds (worker invoked once).
- Faulted fails fast with both states in the message, zero waiting.
- Timeout elapses → fails with both states.
- Default `0` → unchanged fail-fast (no wait, no behavior change).
**Verify:** `dotnet test src/ZB.MOM.WW.MxGateway.Tests --filter "FullyQualifiedName~SessionManager"`
(plus the options-validator test class).
---
## WS-D — `FakeWorkerProcess` consolidation
**What:** Replace the private nested `FakeWorkerProcess` in
`SessionWorkerClientFactoryFakeWorkerTests`, `WorkerProcessLauncherTests`, and
`WorkerClientTests` with the canonical `TestSupport/FakeWorkerProcess(int)` (which already
has `MarkExited`/`Kill`/TCS-backed `WaitForExitAsync`). Where a nested copy carries extra
behavior the canonical lacks, fold that into the canonical first, then delete the copies.
**Verify:** `dotnet test src/ZB.MOM.WW.MxGateway.Tests --filter "FullyQualifiedName~WorkerClient | FullyQualifiedName~WorkerProcessLauncher | FullyQualifiedName~SessionWorkerClientFactory"`.
---
## Testing & sequencing
Per the targeted-test rule in `CLAUDE.md` (Source Update Workflow): each task runs only
its own filtered tests. Run the full gateway suite at most once, after WS-C + WS-D land.
Out-of-scope items remain recorded in `stillpending.md` (vendor/rig-gated) and the
session-resilience epic (`oldtasks.md`).
@@ -0,0 +1,327 @@
# Still-Pending §8 Completion Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans (or subagent-driven-development) to implement this plan task-by-task.
**Goal:** Close the three actionable `stillpending.md` §8 test-coverage follow-ups — Java CLI coverage for the 10 untested subcommands, the gateway Worker-Ready bounded ready-wait, and the `FakeWorkerProcess` de-duplication.
**Architecture:** Four independent workstreams. Java CLI tests split into a synchronous tier (existing `FakeSession` seam) and a streaming/galaxy tier (new in-process gRPC harness over the *real* client, using the public `Channel` constructors that already exist). C# work adds an opt-in bounded ready-wait in the session hot path (default off = no behavior change) and consolidates three duplicate test fakes onto the canonical `TestSupport/FakeWorkerProcess`.
**Tech Stack:** Java 21 + Gradle + picocli + grpc-java (`grpc-inprocess`, `grpc-testing`); .NET 10 + xUnit.
**Design doc:** `docs/plans/2026-06-16-stillpending-section8-design.md`. Branch: `feat/stillpending-section8`.
**Key facts verified during planning:**
- `clients/java/.../cli/MxGatewayCli.java`: `GatewayCommand` has a `clientFactory` (`MxGatewayCliClientFactory`) seam tests already override; `GalaxyCommand.connect()` (line ~368) calls the *static* `GalaxyRepositoryClient.connect(...)` with **no** injectable seam. The `FakeSession`/`FakeClient`/`FakeClientFactory` test doubles live in `MxGatewayCliTests.java` (~lines 636984).
- `MxGatewayClient(Channel, MxGatewayClientOptions)` and `GalaxyRepositoryClient(Channel, MxGatewayClientOptions)` are **public** constructors (line 67 of each) — point them at an in-process channel, no library change needed.
- `grpc-inprocess` + `grpc-testing` are test deps in the **client** module only; the **cli** module's `build.gradle` needs them added.
- C#: option class is `SessionOptions` (`src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs`), config section `MxGateway:Sessions`, `{ get; init; }` style. `GetReadyWorkerClient()` is at `GatewaySession.cs:1665`; callers are `InvokeAsync` (`:918`, already async) and `ReadEventsAsync` (`:1263`, returns `IAsyncEnumerable` non-async — must become an async iterator).
**Out of scope:** Session-resilience epic (Tasks 1328, see `docs/plans/2026-06-15-session-resilience.md`); vendor/rig-gated §1.3/§1.4/§3.x/§5/§6.1 items.
**Testing rule (CLAUDE.md):** Each task runs ONLY its own filtered tests. Full gateway suite at most once, after Tasks 8 + 9 land.
---
## Workstream / dependency overview
| Task | WS | Title | Class | Files | blockedBy | ∥ with |
|------|----|-------|-------|-------|-----------|--------|
| 1 | A | FakeSession recorders + read/write/write2-bulk tests | small | cli test | — | 3,4,7,9 |
| 2 | A | secured/secured2/bench-bulk + close-session tests | small | cli test | 1 | 3,4,7,9 |
| 3 | B | GalaxyClientFactory seam + cli grpc test deps | small | cli main + build.gradle | — | 1,2,4,7,8,9 |
| 4 | B | In-process gRPC harness fixture | standard | new cli test file | — | 1,2,3,7,8,9 |
| 5 | B | stream-events test via harness | standard | cli test | 2,4 | 7,8,9 |
| 6 | B | galaxy-watch + galaxy-discover tests via harness | standard | cli test | 3,5 | 7,8,9 |
| 7 | C | WorkerReadyWaitTimeoutMs option + validator + doc | small | C# config + doc | — | all Java |
| 8 | C | Bounded ready-wait in GatewaySession + tests | high-risk | C# server + test | 7 | all Java, 9 |
| 9 | D | FakeWorkerProcess consolidation | standard | C# tests | — | all |
---
## Task 1: FakeSession recorders + read-bulk / write-bulk / write2-bulk CLI tests
**Classification:** small
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 3, 4, 7, 9
**Files:**
- Test: `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java`
**Context:** `FakeSession` (a `MxGatewayCli.MxGatewayCliSession` impl, ~line 812) currently returns empty lists from `readBulk`/`writeBulk`/`write2Bulk`. Empty returns make CLI JSON-shape assertions vacuous. Mirror the existing `subscribeBulkCommandPrintsResults` test style (uses `FakeClientFactory``execute(...)` → asserts on captured stdout JSON).
**Step 1: Upgrade `FakeSession` to record + synthesize.** Add fields capturing the last call args (e.g. `lastReadBulkTimeoutMs`, `lastReadBulkItems`, `lastWriteBulkEntries`, `lastWrite2BulkEntries`) and change `readBulk`/`writeBulk`/`write2Bulk` to synthesize **one** result per requested handle: a `BulkReadResult` carrying `tagAddress`, `itemHandle`, `wasCached`, `quality`; a `BulkWriteResult` carrying the handle + an `Ok` status. Keep empty-list default only when no handles requested.
**Step 2: Write the three failing tests:**
- `readBulkCommandForwardsTimeoutAndPrintsResults` — run `read-bulk` with `--timeout-ms 750` + two tags; assert `lastReadBulkTimeoutMs == 750` and the stdout JSON carries per-tag `tagAddress`/`itemHandle`/`wasCached`/`quality`.
- `writeBulkCommandParsesTypedValuesAndPrintsResults``--type int32 --values 111,222 --user-id 5`; assert entries parsed through the shared `parseValue` switch into typed `MxValue`s with `userId==5`, and JSON shows the `bulkWriteResultMap`.
- `write2BulkCommandForwardsTimestampAndPrintsResults``--timestamp 2026-05-20T00:00:00Z`; assert the entry's `hasTimestampValue()` is true.
**Step 3: Run them and confirm they fail** (empty/zero before the recorder upgrade):
`gradle :zb-mom-ww-mxgateway-cli:test --tests '*MxGatewayCliTests'` (from `clients/java`).
**Step 4: With the Step-1 recorder in place, run again — expect PASS.**
**Step 5: Commit.**
```bash
git add clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java
git commit -m "test(java-cli): cover read-bulk/write-bulk/write2-bulk round trips"
```
**Acceptance:** 3 new green tests; `FakeSession` records args and returns one row per handle.
---
## Task 2: secured / secured2 / bench-read-bulk + close-session CLI tests
**Classification:** small
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 3, 4, 7, 9
**blockedBy:** Task 1 (same test file + shared `FakeSession` recorders)
**Files:**
- Test: `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java`
**Step 1: Extend `FakeSession`/`FakeClient` recorders** for `writeSecuredBulk`/`writeSecured2Bulk` (capture `currentUserId`/`verifierUserId`) and add a `CloseSessionReply` recorder to `FakeClient` for `closeSession`.
**Step 2: Write the four failing tests:**
- `writeSecuredBulkCommandForwardsUserIdsAndPrintsResults``--current-user-id 7 --verifier-user-id 8`; assert both propagate.
- `writeSecured2BulkCommandForwardsTimestampAndUserIdsAndPrintsResults` — timestamp + both user-ids.
- `benchReadBulkCommandEmitsJsonSchemaKeys``--duration-seconds 1 --warmup-seconds 0`; assert the JSON contains `language=java`, `command=bench-read-bulk`, `bulkSize`, `totalCalls`, `successfulCalls`, `failedCalls`, `callsPerSecond`, `latencyMs.p50/p95/p99`, and the synthesized `tags`. Assert schema keys, NOT numeric values.
- `closeSessionCommandPrintsReply` — assert the `CloseSessionReply` round-trips to stdout.
**Step 34: Run failing → implement recorders → run PASS** (same gradle command as Task 1, narrowed `--tests '*MxGatewayCliTests'`).
**Step 5: Commit** `test(java-cli): cover secured/secured2/bench bulk + close-session`.
**Acceptance:** 4 new green tests; bench test pins schema keys only.
---
## Task 3: GalaxyClientFactory seam + cli grpc test deps
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 1, 2, 4, 7, 8, 9
**Files:**
- Modify: `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java` (`GalaxyCommand`, ~lines 361371; `connect()` ~line 368)
- Modify: `clients/java/zb-mom-ww-mxgateway-cli/build.gradle`
**Context:** `GalaxyCommand.connect()` hard-calls `GalaxyRepositoryClient.connect(...)`. Mirror the existing `MxGatewayCliClientFactory` seam so tests can supply an in-process-backed client.
**Step 1: Add the seam.** Introduce an interface `GalaxyClientFactory { GalaxyRepositoryClient connect(MxGatewayClientOptions options); }`, give `GalaxyCommand` a `final GalaxyClientFactory galaxyClientFactory` field (constructor-injected, defaulting to `GalaxyRepositoryClient::connect` for production wiring), and change `connect()` to delegate to it. Thread the factory through the picocli command construction the same way `clientFactory` is threaded for gateway commands. Keep the default production path identical (no behavior change).
**Step 2: Add test deps** to `clients/java/zb-mom-ww-mxgateway-cli/build.gradle`:
```gradle
testImplementation "io.grpc:grpc-inprocess:${grpcVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
```
**Step 3: Build to confirm wiring compiles + production galaxy commands still resolve:**
`gradle :zb-mom-ww-mxgateway-cli:compileJava :zb-mom-ww-mxgateway-cli:compileTestJava` (from `clients/java`).
**Step 4: Run the existing galaxy CLI tests to confirm no regression:**
`gradle :zb-mom-ww-mxgateway-cli:test --tests '*MxGatewayCliTests'`.
**Step 5: Commit** `feat(java-cli): inject GalaxyClientFactory seam; add grpc inprocess test deps`.
**Acceptance:** Seam present, default wiring unchanged, existing galaxy tests green, in-process deps available to the test source set.
---
## Task 4: In-process gRPC harness fixture
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 1, 2, 3, 7, 8, 9
**Files:**
- Create: `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/InProcessGatewayHarness.java`
**Context:** Streaming/galaxy commands can't use `FakeSession` (real `MxEventStream`/`DeployEventStream` package-private ctors; `GalaxyRepositoryClient` final). Drive the *real* client over an in-process channel against scripted fake services. The public `MxGatewayClient(Channel, options)` / `GalaxyRepositoryClient(Channel, options)` ctors make this clean.
**Step 1: Build the fixture** — `AutoCloseable`, unique server name per instance:
- Start `InProcessServerBuilder.forName(name).directExecutor().addService(fakeGateway).addService(fakeGalaxy).build().start()`.
- Expose `ManagedChannel channel()` via `InProcessChannelBuilder.forName(name).directExecutor().build()`.
- `fakeGateway` extends `MxAccessGatewayGrpc.MxAccessGatewayImplBase`, overriding `streamEvents` (push a scripted `List<MxEvent>` to the `StreamObserver`, then `onCompleted`) and `closeSession`. `fakeGalaxy` extends `GalaxyRepositoryGrpc.GalaxyRepositoryImplBase`, overriding `discoverHierarchy` (return a small paged `GalaxyObject` set) and `watchDeployEvents` (stream scripted deploy events). Make the scripted payloads settable on the harness (constructor args or setters).
- Provide helpers: `MxGatewayClient gatewayClient()``new MxGatewayClient(channel(), testOptions())`; `GalaxyRepositoryClient galaxyClient()``new GalaxyRepositoryClient(channel(), testOptions())`, where `testOptions()` builds an `MxGatewayClientOptions` with a dummy api-key.
- `close()` shuts down channel + server.
**Step 2: Smoke-verify the harness in isolation** — add a temporary `@Test` (or a tiny self-test) that opens the harness, calls `gatewayClient()` and streams one scripted event, then delete it before commit. Build:
`gradle :zb-mom-ww-mxgateway-cli:compileTestJava`.
**Step 3: Run the cli test set to confirm nothing breaks:**
`gradle :zb-mom-ww-mxgateway-cli:test --tests '*MxGatewayCliTests'`.
**Step 4: Commit** `test(java-cli): add in-process gRPC harness fixture`.
**Acceptance:** Compiles; harness starts/stops cleanly; scripted services reachable through the real client types. (No assertions on CLI yet — that's Tasks 56.)
---
## Task 5: stream-events CLI test via harness
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 7, 8, 9
**blockedBy:** Task 2 (same test file ordering), Task 4 (harness)
**Files:**
- Test: `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java`
**Step 1: Wire a harness-backed `MxGatewayCliClientFactory`** in the test that builds the CLI client over `harness.gatewayClient()` (reuse the production adapter that wraps `MxGatewayClient` as `MxGatewayCliClient`; it is package-visible from the test's package). Script ≥2 `MxEvent`s including one with a **high uint64 worker sequence** to cover the unsigned-format regression.
**Step 2: Write the failing test** `streamEventsRendersScriptedEventsIncludingHighUint64Sequence` — run `stream-events`, assert stdout contains the scripted event fields and the high sequence renders unsigned (not negative).
**Step 3: Run failing → (harness already supplies behavior) → PASS:**
`gradle :zb-mom-ww-mxgateway-cli:test --tests '*MxGatewayCliTests'`.
**Step 4: Commit** `test(java-cli): cover stream-events over in-process harness`.
**Acceptance:** stream-events exercised through the real `MxEventStream`; unsigned-sequence rendering asserted.
---
## Task 6: galaxy-watch + galaxy-discover CLI tests via harness
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 7, 8, 9
**blockedBy:** Task 3 (GalaxyClientFactory seam), Task 5 (same test file ordering)
**Files:**
- Test: `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java`
**Step 1: Wire a harness-backed `GalaxyClientFactory`** (from Task 3) that returns `harness.galaxyClient()`.
**Step 2: Write the failing tests:**
- `galaxyDiscoverPrintsPagedHierarchyJson` — assert the scripted `GalaxyObject` hierarchy renders in CLI JSON (object fields + counts).
- `galaxyWatchRendersScriptedDeployEvents` — assert the scripted deploy events render in the CLI feed; honor `--limit` if the command supports it.
**Step 3: Run failing → PASS:** `gradle :zb-mom-ww-mxgateway-cli:test --tests '*MxGatewayCliTests'`.
**Step 4: Full cli module verification** (this closes the §8 Java item): `gradle :zb-mom-ww-mxgateway-cli:test`.
**Step 5: Commit** `test(java-cli): cover galaxy-discover/galaxy-watch over in-process harness`.
**Acceptance:** All 10 previously-untested subcommands now have CLI coverage; `MxGatewayCliTests` green.
---
## Task 7: WorkerReadyWaitTimeoutMs option + validator + doc
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** all Java tasks (16), Task 9
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs`
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs`
- Modify: `docs/GatewayConfiguration.md`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Configuration/GatewayOptionsTests.cs` (or the existing options-validator test class)
**Step 1: Add the option** to `SessionOptions` (match the `{ get; init; }` + XML-doc style):
```csharp
/// <summary>
/// Gets the bounded time, in milliseconds, the gateway will wait for a worker client
/// to reach <c>Ready</c> when the session itself is already <c>Ready</c> but the worker
/// state has transiently diverged (e.g. <c>Handshaking</c> after a heartbeat blip).
/// The wait applies only to transient worker states; terminal states
/// (<c>Faulted</c>/<c>Closing</c>/<c>Closed</c>/no worker) fail fast immediately.
/// A value of <c>0</c> (the default) disables the wait — the gateway keeps the original
/// fail-fast behavior. Must be greater than or equal to zero.
/// </summary>
public int WorkerReadyWaitTimeoutMs { get; init; }
```
**Step 2: Validate `>= 0`** in `GatewayOptionsValidator` (mirror an existing numeric check; message e.g. `MxGateway:Sessions:WorkerReadyWaitTimeoutMs must be greater than or equal to zero.`).
**Step 3: Document** the new key in `docs/GatewayConfiguration.md` under the `MxGateway:Sessions` section (default 0 = disabled; transient-only; terminal fails fast).
**Step 4: Write + run the failing test** asserting default is `0` and a negative value fails validation:
`dotnet test src/ZB.MOM.WW.MxGateway.Tests --filter "FullyQualifiedName~GatewayOptions"` → expect FAIL pre-impl, PASS post.
**Step 5: Commit** `feat(server): add MxGateway:Sessions:WorkerReadyWaitTimeoutMs (default off)`.
**Acceptance:** Option binds, default 0, negative rejected, doc updated.
---
## Task 8: Bounded ready-wait in GatewaySession + tests
**Classification:** high-risk
**Estimated implement time:** ~5 min (split if it grows)
**Parallelizable with:** all Java tasks, Task 9
**blockedBy:** Task 7
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (`GetReadyWorkerClient` `:1665`; `InvokeAsync` `:918`; `ReadEventsAsync` `:1263`)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` (or the test class covering `GetReadyWorkerClient` diagnostics)
**Context & constraints:** The not-ready check runs inside the `_syncRoot` lock — **never sleep/poll inside the lock**. Read state under the lock, release, await, re-check. The both-states diagnostic (`Session state is {_state}; worker state is {workerState}.`) MUST be preserved for the final failure. Default timeout 0 ⇒ behavior identical to today.
**Step 1: Write failing tests first (TDD)** in the session-manager test class, using a `FakeWorkerClient` whose `State` is settable:
- `InvokeAsync_WhenWorkerHandshakingThenReadyWithinTimeout_Succeeds` — option `WorkerReadyWaitTimeoutMs=500`; worker starts `Handshaking`, flips to `Ready` after ~50 ms (e.g. via a background `Task` or a `TimeProvider`-driven advance); assert the invoke succeeds and the worker is invoked once.
- `InvokeAsync_WhenWorkerFaulted_FailsFastWithBothStates` — worker `Faulted`, timeout 500; assert it throws *immediately* (no meaningful delay) and the message contains both `Session state is Ready` and `worker state is Faulted`.
- `InvokeAsync_WhenTimeoutElapsesStillNotReady_FailsWithBothStates` — worker stays `Handshaking`, timeout small (e.g. 100 ms); assert throw after ~timeout with both states.
- `InvokeAsync_WhenTimeoutZero_FailsFastUnchanged` — worker `Handshaking`, timeout 0; assert immediate fail-fast (pins the no-behavior-change default).
**Step 2: Run tests → expect FAIL/compile-error** (`GetReadyWorkerClientAsync` not present):
`dotnet test src/ZB.MOM.WW.MxGateway.Tests --filter "FullyQualifiedName~SessionManager"`.
**Step 3: Implement `GetReadyWorkerClientAsync(CancellationToken)`:**
- Under `_syncRoot`: capture `_state` and `_workerClient?.State`. If session is `Ready` and worker is `Ready` → return it (fast path, no await). If worker is terminal (`Faulted`/`Closing`/`Closed`) or null, or session not `Ready` → throw the both-states `SessionManagerException` now (fail fast). If worker is transient (`Handshaking`/`Created`) AND `WorkerReadyWaitTimeoutMs > 0` → fall through to the wait.
- Wait loop OUTSIDE the lock: until a deadline (`now + WorkerReadyWaitTimeoutMs`), `await Task.Delay(pollIntervalMs, ct)` (const `pollIntervalMs = 25`), then re-acquire `_syncRoot` and re-evaluate: Ready → return; terminal/null/session-not-Ready → fail fast with both states; still transient → keep waiting. On deadline → throw both-states.
- Keep the existing synchronous `GetReadyWorkerClient()` for any non-async caller, or have it delegate to a zero-wait evaluation to avoid duplicated message logic (extract a private `EvaluateReadyUnderLock(out string failureMessage)` helper used by both).
**Step 4: Update callers:**
- `InvokeAsync` (`:918`): `IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);`.
- `ReadEventsAsync` (`:1263`): convert to an async iterator — `public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync([EnumeratorCancellation] CancellationToken cancellationToken)`, `await GetReadyWorkerClientAsync(...)`, `TouchClientActivity(...)`, then `await foreach (var e in workerClient.ReadEventsAsync(cancellationToken)) yield return e;`. Verify no caller relied on eager (pre-enumeration) throw semantics — if one does, note it for the reviewer.
**Step 5: Run the targeted tests → PASS** (same filter). Confirm the 4 new tests + pre-existing `GetReadyWorkerClient` diagnostic test all pass.
**Step 6: Commit** `feat(server): bounded worker-ready wait in GatewaySession (default off)`.
**Acceptance:** Transient states wait up to the timeout; terminal states fail fast with both states; default 0 is byte-for-byte the old behavior; no sleeping under `_syncRoot`.
---
## Task 9: FakeWorkerProcess consolidation
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** all tasks
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs` (canonical — extend if needed)
- Modify: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionWorkerClientFactoryFakeWorkerTests.cs` (nested copy ~line 343)
- Modify: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs` (nested copy ~line 244)
- Modify: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs` (nested copy ~line 767; already `using ...TestSupport`)
**Context:** Canonical `TestSupport/FakeWorkerProcess(int)` has `MarkExited`/`Kill`/TCS-backed `WaitForExitAsync`. Three test files still declare private nested `FakeWorkerProcess`. Consolidate.
**Step 1: Diff each nested copy against the canonical.** For each, list any members/behavior the canonical lacks (e.g. extra counters, scripted exit codes). Fold those into the canonical `TestSupport/FakeWorkerProcess` **first** (additively, so existing canonical users keep compiling).
**Step 2: Delete each nested class** and update references to the canonical type; add/confirm `using ZB.MOM.WW.MxGateway.Tests.TestSupport;`.
**Step 3: Run the three affected test classes:**
```
dotnet test src/ZB.MOM.WW.MxGateway.Tests --filter "FullyQualifiedName~WorkerClientTests|FullyQualifiedName~WorkerProcessLauncherTests|FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests"
```
Expected: all pass (behavior preserved; `KillCount`/`HasExited`/`ExitCode` semantics intact).
**Step 4: Commit** `refactor(tests): consolidate FakeWorkerProcess onto TestSupport canonical`.
**Acceptance:** Exactly one `FakeWorkerProcess` definition (the canonical); three files import it; affected tests green.
---
## Final verification (after Tasks 8 + 9)
Run the full gateway suite once to confirm no cross-cutting regression:
```
dotnet test src/ZB.MOM.WW.MxGateway.Tests/ZB.MOM.WW.MxGateway.Tests.csproj
```
Expected baseline: prior green count + the new Task 7/8/9 tests; the 3 known macOS-environmental failures (TLS temp-file, OrphanWorkerTerminator ×2) may persist — confirm no *new* failures.
Then finish via `superpowers-extended-cc:finishing-a-development-branch`.
@@ -0,0 +1,15 @@
{
"planPath": "docs/plans/2026-06-16-stillpending-section8.md",
"tasks": [
{"id": 140, "subject": "Task 1: FakeSession recorders + read/write/write2-bulk CLI tests", "status": "pending"},
{"id": 141, "subject": "Task 2: secured/secured2/bench-bulk + close-session CLI tests", "status": "pending", "blockedBy": [140]},
{"id": 142, "subject": "Task 3: GalaxyClientFactory seam + cli grpc test deps", "status": "pending"},
{"id": 143, "subject": "Task 4: In-process gRPC harness fixture", "status": "pending"},
{"id": 144, "subject": "Task 5: stream-events CLI test via harness", "status": "pending", "blockedBy": [141, 143]},
{"id": 145, "subject": "Task 6: galaxy-watch + galaxy-discover CLI tests via harness", "status": "pending", "blockedBy": [142, 144]},
{"id": 146, "subject": "Task 7: WorkerReadyWaitTimeoutMs option + validator + doc", "status": "pending"},
{"id": 147, "subject": "Task 8: Bounded ready-wait in GatewaySession + tests", "status": "pending", "blockedBy": [146]},
{"id": 148, "subject": "Task 9: FakeWorkerProcess consolidation", "status": "pending"}
],
"lastUpdated": "2026-06-16"
}
+68
View File
@@ -0,0 +1,68 @@
# Saved Task List — Session Resilience Epic
> Snapshot taken 2026-06-16, before switching to the dashboard disable-login feature.
> This is the in-flight epic from `docs/plans/2026-06-15-session-resilience.md`.
## How to resume
```
/superpowers-extended-cc:executing-plans docs/plans/2026-06-15-session-resilience.md
```
The authoritative resume state lives in
`docs/plans/2026-06-15-session-resilience.md.tasks.json` (tasks 112 completed,
1328 pending). This file is just a human-readable mirror.
## Status
**12 of 28 tasks complete** (Phases 12 + reconnect core of Phase 3). All completed
work is merged to `main` (commit `c446bef`, pushed to origin).
### Completed — Phase 1 (Foundation)
- ✅ Task 1 (#108): Add OwnerKeyId to the session
- ✅ Task 2 (#109): SessionEventDistributor skeleton
- ✅ Task 3 (#110): Bounded replay ring buffer
- ✅ Task 4 (#111): Rewire AttachEventSubscriber + EventStreamService onto distributor
- ✅ Task 5 (#112): Per-subscriber backpressure isolation
- ✅ Task 6 (#113): Dashboard broadcaster becomes a distributor subscriber
### Completed — Phase 2 (Multi-subscriber fan-out)
- ✅ Task 7 (#114): Remove validator block + add subscriber cap option
- ✅ Task 8 (#115): Subscriber-lease collection + cap enforcement
- ✅ Task 9 (#116): Multi-subscriber end-to-end test (FakeWorkerHarness)
### Completed — Phase 3 (Reconnect core)
- ✅ Task 10 (#117): Proto — ReplayGap signal
- ✅ Task 11 (#118): Detach-grace session retention
- ✅ Task 12 (#119): Replay-on-reconnect + emit ReplayGap
### Pending — Phase 3 finish
- ⏳ Task 13 (#120): Owner re-validation on reconnect — blockedBy 12, 1
- ⏳ Task 14 (#121): Client ReplayGap handling — all 5 clients — blockedBy 10
- Carry the per-language presence-check idiom note for `optional` message fields.
- ⏳ Task 15 (#122): Reconnect integration test (fake worker) — blockedBy 12
### Pending — Phase 4 (Per-session dashboard ACL)
- ⏳ Task 16 (#123): gRPC session-owner gate + all-sessions admin scope — blockedBy 9, 1
- ⏳ Task 17 (#124): Session Tag + dashboard group-to-tag config — blockedBy 9
- ⏳ Task 18 (#125): EventsHub per-session ACL + hub-token tag claim — blockedBy 17
- Open decision: Viewer default (admin-sees-all vs strict per-session).
- ⏳ Task 19 (#126): ACL tests incl. live LDAP users — blockedBy 18
### Pending — Phase 5 (Orphan-worker reattach)
- ⏳ Task 20 (#127): Stable gateway-instance id + stable pipe naming — blockedBy 19
- ⏳ Task 21 (#128): Adoption manifest store (SQLite) — blockedBy 20
- ⏳ Task 22 (#129): Proto — worker adopt/reconnect frame — blockedBy 21
- ⏳ Task 23 (#130): Worker phone-home reconnect loop + self-terminate — blockedBy 22 (net48/x86, windev)
- ⏳ Task 24 (#131): Gateway adoption — re-open pipes, nonce-validate, reject impostors — blockedBy 23
- ⏳ Task 25 (#132): Resync adopted worker + ReplayGap to subscribers — blockedBy 24, 12
- ⏳ Task 26 (#133): EnableOrphanReattach flag (default off) + terminator fallback — blockedBy 24
- ⏳ Task 27 (#134): Gateway-restart reattach round-trip (WINDEV + live worker) — blockedBy 25, 26
- ⏳ Task 28 (#135): Documented-rule reversals + stillpending refresh — blockedBy 27
## Notes
- Phase 5 reverses the documented "Gateway restart does not reattach orphan workers"
rule (CLAUDE.md) — this was explicitly approved during design.
- Two deferred follow-ups noted earlier: dashboard visibility of `DetachedAtUtc` on
`DashboardSessionSummary`.
- Worker (net48/x86) tasks build/test on windev; everything else builds on macOS.
File diff suppressed because it is too large Load Diff
@@ -715,6 +715,23 @@ message MxEvent {
google.protobuf.Timestamp gateway_receive_timestamp = 11;
optional int32 hresult = 12;
string raw_status = 13;
// Gateway-synthesized reconnect-replay gap signal. Set ONLY on the single
// sentinel MxEvent the gateway emits at the head of a StreamEvents stream
// that was resumed via StreamEventsRequest.after_worker_sequence when the
// requested sequence is older than the oldest event still retained in the
// session replay ring (i.e. events were evicted and cannot be replayed).
// On that sentinel, `family` is UNSPECIFIED, the `body` oneof is unset, and
// no per-item fields (server_handle/item_handle/value/...) are populated;
// clients MUST treat a present `replay_gap` as "you missed events discard
// local state and re-snapshot" and read `requested_after_sequence` /
// `oldest_available_sequence` from it. Unset on every normal MXAccess event.
// This field is ONLY ever set on events returned from the StreamEvents server
// stream; it is ALWAYS unset on events in DrainEventsReply (the diagnostic
// drain path never emits the sentinel).
// Additive (proto3): existing clients that ignore this field continue to
// deserialize the stream unchanged. (Reconnect/replay logic is Task 12; this
// is the contract surface only.)
optional ReplayGap replay_gap = 14;
oneof body {
OnDataChangeEvent on_data_change = 20;
@@ -726,6 +743,27 @@ message MxEvent {
}
}
// Reconnect-replay gap signal carried by a sentinel MxEvent (MxEvent.replay_gap)
// when a client resumes StreamEvents via after_worker_sequence but the requested
// sequence predates the oldest event still held in the session replay ring.
// The events in the open interval (requested_after_sequence, oldest_available_sequence)
// were evicted from the ring and cannot be replayed, so the client must
// re-snapshot rather than assume a contiguous event history.
message ReplayGap {
// The worker_sequence the client asked to resume after
// (StreamEventsRequest.after_worker_sequence).
uint64 requested_after_sequence = 1;
// The oldest worker_sequence still retained in the replay ring and available
// for replay. Events with worker_sequence in the open interval
// (requested_after_sequence, oldest_available_sequence) were evicted and are
// unrecoverable. oldest_available_sequence itself IS still retained: a client
// that wishes to resume without incurring another gap MUST set
// after_worker_sequence = oldest_available_sequence - 1 in the next
// StreamEventsRequest, which will cause the server to replay starting at
// oldest_available_sequence (the first retained event).
uint64 oldest_available_sequence = 2;
}
enum MxEventFamily {
MX_EVENT_FAMILY_UNSPECIFIED = 0;
MX_EVENT_FAMILY_ON_DATA_CHANGE = 1;
@@ -8,6 +8,23 @@ public sealed class DashboardOptions
/// <summary>Gets whether anonymous localhost access to dashboard is allowed.</summary>
public bool AllowAnonymousLocalhost { get; init; } = true;
/// <summary>
/// DEV/TEST ONLY. When true, the dashboard bypasses the login form entirely and
/// auto-authenticates EVERY request as <see cref="AutoLoginUser"/> holding both
/// dashboard roles (Administrator + Viewer). No cookie, no LDAP bind. Default false.
/// Unlike <see cref="AllowAnonymousLocalhost"/> (which only succeeds the authorization
/// requirement without authenticating), this mints a real principal, so the UI behaves
/// as a signed-in admin and applies to all clients (not just loopback). Never enable in
/// production. See docs/plans/2026-06-16-dashboard-disable-login-design.md.
/// </summary>
public bool DisableLogin { get; init; }
/// <summary>
/// Username minted for the auto-login principal when <see cref="DisableLogin"/> is true.
/// Null/blank falls back to the GLAuth Administrator test user <c>multi-role</c>.
/// </summary>
public string? AutoLoginUser { get; init; }
/// <summary>
/// When true (default), the dashboard auth cookie is restricted to HTTPS
/// requests via <see cref="Microsoft.AspNetCore.Http.CookieSecurePolicy.Always"/>.
@@ -6,5 +6,6 @@ public sealed record EffectiveSessionConfiguration(
int MaxPendingCommandsPerSession,
int DefaultLeaseSeconds,
int LeaseSweepIntervalSeconds,
int DetachGraceSeconds,
bool AllowMultipleEventSubscribers,
int MaxEventSubscribersPerSession);
@@ -46,6 +46,7 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
DetachGraceSeconds: value.Sessions.DetachGraceSeconds,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
Events: new EffectiveEventConfiguration(
@@ -181,6 +181,23 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
options.MaxEventSubscribersPerSession,
"MxGateway:Sessions:MaxEventSubscribersPerSession must be greater than zero.",
builder);
AddIfNegative(
options.DetachGraceSeconds,
"MxGateway:Sessions:DetachGraceSeconds must be zero or greater (0 disables detach-grace retention).",
builder);
AddIfNegative(
options.WorkerReadyWaitTimeoutMs,
"MxGateway:Sessions:WorkerReadyWaitTimeoutMs must be greater than or equal to zero.",
builder);
// NOTE: We intentionally do NOT reject !AllowMultipleEventSubscribers &&
// MaxEventSubscribersPerSession > 1 as a hard validation error here. The default
// SessionOptions ships with AllowMultipleEventSubscribers=false and
// MaxEventSubscribersPerSession=8; making those defaults a validation failure would
// break every deployment that has not explicitly set the cap. The cap is simply
// ignored in single-subscriber mode (AttachEventSubscriber derives effectiveCap=1),
// so the only practical consequence of the apparent inconsistency is a dead config
// knob, not incorrect behavior.
}
private static void ValidateEvents(EventOptions options, ValidationBuilder builder)
@@ -23,6 +23,28 @@ public sealed class SessionOptions
/// <summary>Gets the interval for sweeping expired session leases in seconds.</summary>
public int LeaseSweepIntervalSeconds { get; init; } = 30;
/// <summary>
/// Gets the detach-grace retention window, in seconds, that a session is kept alive
/// after its last external (gRPC) event-stream subscriber drops, so a client can
/// reconnect to it. While within the window the session stays in
/// <c>Ready</c> and remains usable; if no new external subscriber attaches before the
/// window elapses, the lease monitor closes the session exactly as it closes an
/// expired lease. The gateway-owned internal dashboard subscriber does not count as an
/// external subscriber, so a session whose only remaining subscriber is the dashboard
/// mirror still enters detach-grace. A value of <c>0</c> disables retention: the
/// session reverts to the original behavior of lingering only until its normal lease
/// expires. The reconnect/replay itself is implemented separately (Task 12); this
/// option controls retention and expiry only.
/// </summary>
/// <remarks>
/// The effective close happens within the next sweep cycle after the window elapses —
/// up to <see cref="LeaseSweepIntervalSeconds"/> after expiry. Operators who want a
/// firm minimum bound should set <c>DetachGraceSeconds</c> greater than
/// <see cref="LeaseSweepIntervalSeconds"/>; otherwise a session whose window expires
/// just before a sweep run may be closed within seconds of detach.
/// </remarks>
public int DetachGraceSeconds { get; init; } = 30;
/// <summary>
/// Gets a value indicating whether multiple event subscribers are allowed per session.
/// </summary>
@@ -34,4 +56,15 @@ public sealed class SessionOptions
/// effectively 1 when it is <see langword="false"/>. Must be greater than zero.
/// </summary>
public int MaxEventSubscribersPerSession { get; init; } = 8;
/// <summary>
/// Gets the bounded time, in milliseconds, the gateway will wait for a worker client
/// to reach <c>Ready</c> when the session itself is already <c>Ready</c> but the worker
/// state has transiently diverged (e.g. <c>Handshaking</c> after a heartbeat blip).
/// The wait applies only to transient worker states; terminal states
/// (<c>Faulted</c>/<c>Closing</c>/<c>Closed</c>/no worker) fail fast immediately.
/// A value of <c>0</c> (the default) disables the wait — the gateway keeps the original
/// fail-fast behavior. Must be greater than or equal to zero.
/// </summary>
public int WorkerReadyWaitTimeoutMs { get; init; }
}
@@ -0,0 +1,95 @@
using System.Security.Claims;
using System.Text.Encodings.Web;
using Microsoft.AspNetCore.Authentication;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Auth.AspNetCore;
using ZB.MOM.WW.MxGateway.Server.Configuration;
namespace ZB.MOM.WW.MxGateway.Server.Dashboard;
/// <summary>
/// Authentication handler used ONLY when <c>MxGateway:Dashboard:DisableLogin</c> is true.
/// Registered under the dashboard cookie scheme name
/// (<see cref="DashboardAuthenticationDefaults.AuthenticationScheme"/>), it authenticates
/// EVERY request as the configured dev user with both dashboard roles — no credential check,
/// no cookie, no LDAP bind. The minted principal mirrors the shape the real login
/// (<see cref="DashboardAuthenticator"/>) produces, so policies and the UI cannot tell it
/// apart. DEV/TEST ONLY; never enable in production.
/// </summary>
public sealed class DashboardAutoLoginAuthenticationHandler
: AuthenticationHandler<AuthenticationSchemeOptions>, IAuthenticationSignInHandler
{
/// <summary>Username used when <c>AutoLoginUser</c> is null or blank.</summary>
public const string DefaultUser = "multi-role";
private readonly string _user;
/// <summary>Initializes the handler with scheme plumbing and the dashboard options.</summary>
/// <param name="options">The per-scheme authentication options monitor.</param>
/// <param name="logger">The logger factory the base handler uses.</param>
/// <param name="encoder">The URL encoder the base handler uses.</param>
/// <param name="gatewayOptions">Gateway options carrying the dashboard auto-login user.</param>
public DashboardAutoLoginAuthenticationHandler(
IOptionsMonitor<AuthenticationSchemeOptions> options,
ILoggerFactory logger,
UrlEncoder encoder,
IOptions<GatewayOptions> gatewayOptions)
: base(options, logger, encoder)
{
string? configured = gatewayOptions.Value.Dashboard.AutoLoginUser;
_user = string.IsNullOrWhiteSpace(configured) ? DefaultUser : configured.Trim();
}
/// <summary>No-op: auto-login writes no cookie, so a sign-in has nothing to persist.</summary>
/// <param name="user">Ignored.</param>
/// <param name="properties">Ignored.</param>
/// <returns>A completed task.</returns>
public Task SignInAsync(ClaimsPrincipal user, AuthenticationProperties? properties) => Task.CompletedTask;
/// <summary>No-op: there is no auth cookie to clear; the next request re-authenticates.</summary>
/// <param name="properties">Ignored.</param>
/// <returns>A completed task.</returns>
public Task SignOutAsync(AuthenticationProperties? properties) => Task.CompletedTask;
/// <inheritdoc />
protected override Task<AuthenticateResult> HandleAuthenticateAsync()
{
ClaimsPrincipal principal = CreatePrincipal(_user);
AuthenticationTicket ticket = new(principal, Scheme.Name);
return Task.FromResult(AuthenticateResult.Success(ticket));
}
/// <summary>
/// Builds the multi-role dev principal. Null/blank <paramref name="user"/> falls back to
/// <see cref="DefaultUser"/>. The authorization-relevant claim shape mirrors
/// <see cref="DashboardAuthenticator"/>; LDAP group claims (<c>LdapGroupClaimType</c>) are
/// intentionally omitted because auto-login has no real LDAP context.
/// </summary>
/// <param name="user">The configured auto-login username (may be null/blank).</param>
/// <returns>An authenticated principal holding both dashboard roles.</returns>
internal static ClaimsPrincipal CreatePrincipal(string? user)
{
string name = string.IsNullOrWhiteSpace(user) ? DefaultUser : user.Trim();
// LdapGroupClaimType claims are omitted — no LDAP groups exist in the auto-login context.
Claim[] claims =
[
new Claim(ClaimTypes.NameIdentifier, name),
new Claim(ZbClaimTypes.Username, name),
new Claim(ZbClaimTypes.Name, name),
new Claim(ZbClaimTypes.DisplayName, name),
new Claim(ZbClaimTypes.Role, DashboardRoles.Admin),
new Claim(ZbClaimTypes.Role, DashboardRoles.Viewer),
];
ClaimsIdentity identity = new(
claims,
DashboardAuthenticationDefaults.AuthenticationScheme,
ZbClaimTypes.Name,
ZbClaimTypes.Role);
return new ClaimsPrincipal(identity);
}
}
@@ -2,6 +2,7 @@ using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Auth.Abstractions.Roles;
using ZB.MOM.WW.Auth.AspNetCore;
@@ -21,7 +22,8 @@ public static class DashboardServiceCollectionExtensions
/// <param name="services">Service collection to register services.</param>
/// <param name="configuration">
/// Application configuration, used to bind the shared LDAP provider's options
/// from the <c>MxGateway:Ldap</c> section.
/// from the <c>MxGateway:Ldap</c> section. Also read to select the dashboard
/// authentication scheme via the <c>MxGateway:Dashboard:DisableLogin</c> dev flag.
/// </param>
public static IServiceCollection AddGatewayDashboard(
this IServiceCollection services,
@@ -55,9 +57,39 @@ public static class DashboardServiceCollectionExtensions
.AddInteractiveServerComponents();
services.AddSignalR();
services
.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme)
.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme, cookieOptions =>
// DEV/TEST ONLY. Read directly from configuration here because authentication scheme
// registration runs before options binding. Key mirrors DashboardOptions.DisableLogin.
bool disableLogin = configuration.GetValue<bool>("MxGateway:Dashboard:DisableLogin");
AuthenticationBuilder authentication =
services.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme);
if (disableLogin)
{
// Register an always-authenticating handler UNDER the cookie scheme name, so the
// Viewer/Admin/HubClients policies (which all resolve this scheme) authenticate
// through it as the multi-role dev user — zero policy or page changes.
authentication.AddScheme<AuthenticationSchemeOptions, DashboardAutoLoginAuthenticationHandler>(
DashboardAuthenticationDefaults.AuthenticationScheme,
_ => { });
// Loud warning, emitted on first resolution of GatewayOptions (i.e. on the first
// request/options access, not guaranteed at process start). Dev-only safety notice.
services.AddOptions<GatewayOptions>().PostConfigure<ILoggerFactory>((gatewayOptions, loggerFactory) =>
loggerFactory
.CreateLogger("ZB.MOM.WW.MxGateway.Server.Dashboard.DisableLogin")
.LogWarning(
"DASHBOARD LOGIN DISABLED (MxGateway:Dashboard:DisableLogin=true) — every request is "
+ "authenticated as '{User}' with full permissions ({Roles}). Dev/test only; never "
+ "enable in production.",
string.IsNullOrWhiteSpace(gatewayOptions.Dashboard.AutoLoginUser)
? DashboardAutoLoginAuthenticationHandler.DefaultUser
: gatewayOptions.Dashboard.AutoLoginUser!.Trim(),
$"{DashboardRoles.Admin}, {DashboardRoles.Viewer}"));
}
else
{
authentication.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme, cookieOptions =>
{
// Hardened defaults (HttpOnly, SameSite=Strict, SecurePolicy, SlidingExpiration,
// ExpireTimeSpan) via the shared ZbCookieDefaults.Apply. requireHttps is set to
@@ -73,10 +105,12 @@ public static class DashboardServiceCollectionExtensions
cookieOptions.LoginPath = "/login";
cookieOptions.LogoutPath = "/logout";
cookieOptions.AccessDeniedPath = "/denied";
})
.AddScheme<AuthenticationSchemeOptions, HubTokenAuthenticationHandler>(
DashboardAuthenticationDefaults.HubAuthenticationScheme,
_ => { });
});
}
authentication.AddScheme<AuthenticationSchemeOptions, HubTokenAuthenticationHandler>(
DashboardAuthenticationDefaults.HubAuthenticationScheme,
_ => { });
// Honour DashboardOptions.RequireHttpsCookie (default true / Always; set false for dev
// HTTP deployments → SameAsRequest) and the optional per-environment cookie-name
@@ -106,6 +106,13 @@ public sealed class GalaxyHierarchyIndex
{
parentKey = 0;
}
// Re-root orphans whose parent object is absent from the set (e.g. a deleted or
// never-loaded container area). Otherwise they bucket under a phantom parent id
// that is never reached from the root, so they vanish from browse entirely.
else if (parentKey != 0 && !objectsById.ContainsKey(parentKey))
{
parentKey = 0;
}
if (!childrenByParent.TryGetValue(parentKey, out List<GalaxyObjectView>? bucket))
{
bucket = [];
@@ -172,6 +172,11 @@ public sealed class GalaxyRepository(GalaxyRepositoryOptions options) : IGalaxyR
AckCommentSubtag = string.Empty,
};
// Area objects (category 13) are returned even when undeployed (deployed_package_id = 0):
// they are organizational/model nodes that group deployed objects, so excluding them
// orphans every area whose containing area is not itself deployed. All non-area objects
// still require deployment. Orphans left by a missing/deleted parent area are re-rooted
// by GalaxyHierarchyIndex.Build so nothing disappears from browse.
private const string HierarchySql = @"
;WITH template_chain AS (
SELECT g.gobject_id AS instance_gobject_id, t.gobject_id AS template_gobject_id,
@@ -218,7 +223,7 @@ INNER JOIN template_definition td
ON g.template_definition_id = td.template_definition_id
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND g.is_template = 0
AND g.deployed_package_id <> 0
AND (g.deployed_package_id <> 0 OR td.category_id = 13)
ORDER BY parent_gobject_id, g.tag_name";
// Unlike HierarchySql, this query has diverged from the OtOpcUa original. It returns two
@@ -68,17 +68,79 @@ public sealed class EventStreamService(
// No `using` here — subscriber.Dispose() is called exactly once in the finally
// block below, which also disposes the reader. A `using` declaration would add a
// second Dispose on the same path and double-decrement the session subscriber count.
IEventSubscriberLease subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
// The subscriber mode (single vs. multi) is derived inside AttachEventSubscriber from
// the session's own SessionEventStreaming.AllowMultipleEventSubscribers field — the
// same source the distributor uses — so the two cannot diverge.
//
// Reconnect/resume (Task 12): when AfterWorkerSequence > 0 the client is resuming, so
// attach via the replay variant that atomically snapshots the replay ring AND registers
// the live subscriber under one lock. That single critical section is the crux of the
// no-gap/no-duplicate handoff: every replayed event has sequence <= LiveResumeSequence
// and every live event delivered below is filtered to sequence > LiveResumeSequence, so
// an event that was both replayed and (racing the registration) fanned into the live
// channel is dropped exactly once, while no newer event is skipped. See
// SessionEventDistributor.RegisterWithReplay for the full argument.
//
// AfterWorkerSequence == 0 (fresh stream, not a resume) keeps the pre-Task-12 behavior:
// a plain attach, no replay, no sentinel, and the live filter watermark stays 0.
ulong afterWorkerSequence = request.AfterWorkerSequence;
IEventSubscriberLease subscriber;
IReadOnlyList<MxEvent> replayedEvents = [];
bool replayGap = false;
ulong oldestAvailableSequence = 0;
if (afterWorkerSequence > 0)
{
EventSubscriberReplayAttachment attachment = session.AttachEventSubscriberWithReplay(
options.Value.Sessions.MaxEventSubscribersPerSession,
afterWorkerSequence);
subscriber = attachment.Lease;
replayedEvents = attachment.ReplayedEvents;
replayGap = attachment.Gap;
oldestAvailableSequence = attachment.OldestAvailableSequence;
// The live filter resumes strictly after the last replayed sequence (or, when
// nothing was replayed, after the requested watermark). This is what makes the
// handoff free of duplicates: anything <= this watermark was already replayed.
afterWorkerSequence = attachment.LiveResumeSequence;
}
else
{
subscriber = session.AttachEventSubscriber(
options.Value.Sessions.MaxEventSubscribersPerSession);
}
int streamQueueDepth = 0;
ulong afterWorkerSequence = request.AfterWorkerSequence;
IAsyncEnumerator<MxEvent> reader = subscriber.Reader
.ReadAllAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken);
try
{
// Emit order for a resume: the ReplayGap sentinel FIRST (only when events were
// evicted), then the still-retained replay batch, then live. The sentinel is an
// explicit documented control signal (not a synthesized MXAccess event) and is
// delivered ONLY to this resuming subscriber — it is never fanned to other
// subscribers and never appears in DrainEventsReply (that path is untouched).
if (replayGap)
{
yield return CreateReplayGapSentinel(
request.SessionId,
request.AfterWorkerSequence,
oldestAvailableSequence);
}
foreach (MxEvent replayedEvent in replayedEvents)
{
// RegisterWithReplay already returns only events strictly newer than
// AfterWorkerSequence, so no per-item sequence guard is needed here.
// There is no per-event constraint filter on the event stream: events are
// fanned as-is by the distributor pump. The only dedup watermark is the
// LiveResumeSequence applied in the live loop below (to drop any event
// that was both replayed and raced into the live channel).
yield return replayedEvent;
}
while (true)
{
MxEvent mxEvent;
@@ -141,4 +203,24 @@ public sealed class EventStreamService(
metrics.StreamDisconnected("Detached");
}
}
// Builds the single ReplayGap control sentinel emitted at the head of a resumed
// StreamEvents stream when the requested AfterWorkerSequence predates the oldest event
// still retained (events were evicted). Per the proto contract (MxEvent.replay_gap),
// the sentinel carries the session id and the populated ReplayGap, with family
// UNSPECIFIED, no body, and no per-item fields. It is a documented control signal — NOT a
// synthesized MXAccess event — so emitting it does not violate the no-synthesis rule.
private static MxEvent CreateReplayGapSentinel(
string sessionId,
ulong requestedAfterSequence,
ulong oldestAvailableSequence)
=> new()
{
SessionId = sessionId,
ReplayGap = new ReplayGap
{
RequestedAfterSequence = requestedAfterSequence,
OldestAvailableSequence = oldestAvailableSequence,
},
};
}
@@ -949,6 +949,7 @@ public sealed class MxAccessGatewayService(
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventSubscriberLimitReached => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
@@ -0,0 +1,43 @@
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// The result of a reconnect/resume attach
/// (<see cref="GatewaySession.AttachEventSubscriberWithReplay"/>, Task 12): the live
/// subscriber lease plus the replay batch and resume watermarks snapshotted atomically
/// with the registration, so the replay→live handoff has no gap and no duplicate.
/// </summary>
/// <param name="Lease">
/// The live event subscriber lease. Disposing it unregisters the distributor subscriber
/// and decrements the session's active-subscriber count, exactly as a fresh attach.
/// </param>
/// <param name="ReplayedEvents">
/// Retained events with worker sequence strictly greater than the requested
/// <c>afterSequence</c>, in ascending order. These must be yielded (after the optional
/// gap sentinel) before live events. Never null; empty when nothing newer is retained.
/// </param>
/// <param name="Gap">
/// <see langword="true"/> when events between the requested <c>afterSequence</c> and the
/// oldest retained event were already evicted, so the client missed unrecoverable events.
/// When <see langword="true"/> the caller emits a <c>ReplayGap</c> sentinel before the
/// replay batch.
/// </param>
/// <param name="OldestAvailableSequence">
/// The oldest worker sequence still retained and replayable; <c>0</c> when nothing is
/// retained. Populates the <c>ReplayGap.oldest_available_sequence</c> field. Meaningful
/// only when <paramref name="Gap"/> is <see langword="true"/>.
/// </param>
/// <param name="LiveResumeSequence">
/// The worker sequence the live channel must resume strictly after: the highest replayed
/// sequence, or the requested <c>afterSequence</c> when nothing was replayed. The caller
/// applies this as the per-subscriber live filter so any event both replayed and fanned
/// into the live channel is dropped exactly once (no duplicate) while every newer event
/// is delivered (no gap).
/// </param>
public readonly record struct EventSubscriberReplayAttachment(
IEventSubscriberLease Lease,
IReadOnlyList<MxEvent> ReplayedEvents,
bool Gap,
ulong OldestAvailableSequence,
ulong LiveResumeSequence);
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
@@ -21,6 +22,9 @@ public sealed class GatewaySession
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
private readonly TimeSpan _detachGrace;
private readonly TimeSpan _workerReadyWaitTimeout;
private DateTimeOffset? _detachedAtUtc;
private SessionEventDistributor? _eventDistributor;
private bool _eventDistributorStarted;
private bool _dashboardMirrorStarted;
@@ -102,6 +106,27 @@ public sealed class GatewaySession
/// session directly still get a working distributor. Production passes the
/// DI-resolved dependencies.
/// </param>
/// <param name="detachGrace">
/// Retention window kept after the last external (gRPC) event subscriber drops, so a
/// client can reconnect (Task 12). When the window is positive and the active external
/// subscriber count falls to zero, the session stays <see cref="SessionState.Ready"/>
/// and records a detached timestamp; the lease monitor closes it once the window
/// elapses with no subscriber having re-attached. <see cref="TimeSpan.Zero"/> (the
/// default) disables retention and preserves the original lease-only expiry behavior.
/// The clock comes from <paramref name="eventStreaming"/>'s
/// <see cref="SessionEventStreaming.TimeProvider"/> so the timer is unit-testable.
/// </param>
/// <param name="workerReadyWaitTimeout">
/// Bounded time the session will wait, on the command/event hot path, for the worker
/// client to reach <see cref="WorkerClientState.Ready"/> when the session is already
/// <see cref="SessionState.Ready"/> but the worker state has transiently diverged
/// (e.g. <see cref="WorkerClientState.Handshaking"/> after a heartbeat blip). The wait
/// applies only to transient worker states; terminal states
/// (<see cref="WorkerClientState.Faulted"/>/<see cref="WorkerClientState.Closing"/>/
/// <see cref="WorkerClientState.Closed"/>/no worker) and a non-<c>Ready</c> session fail
/// fast immediately. <see cref="TimeSpan.Zero"/> (the default) disables the wait and
/// preserves the original fail-fast behavior byte-for-byte.
/// </param>
public GatewaySession(
string sessionId,
string backendName,
@@ -116,7 +141,9 @@ public sealed class GatewaySession
TimeSpan shutdownTimeout,
TimeSpan leaseDuration,
DateTimeOffset openedAt,
SessionEventStreaming? eventStreaming = null)
SessionEventStreaming? eventStreaming = null,
TimeSpan detachGrace = default,
TimeSpan workerReadyWaitTimeout = default)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -154,6 +181,8 @@ public sealed class GatewaySession
_lastClientActivityAt = openedAt;
_leaseExpiresAt = openedAt + leaseDuration;
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
_detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero;
_workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero;
}
/// <summary>
@@ -299,6 +328,25 @@ public sealed class GatewaySession
}
}
/// <summary>
/// Gets the UTC timestamp at which the session entered its detach-grace retention
/// window (the last external event subscriber dropped while a positive
/// detach-grace was configured), or <see langword="null"/> when the session is not
/// currently within a detach-grace window. Re-attaching an external subscriber clears
/// this. Always <see langword="null"/> when detach-grace is disabled
/// (<c>DetachGraceSeconds == 0</c>).
/// </summary>
public DateTimeOffset? DetachedAtUtc
{
get
{
lock (_syncRoot)
{
return _detachedAtUtc;
}
}
}
/// <summary>
/// Attaches the worker client for this session.
/// </summary>
@@ -399,6 +447,32 @@ public sealed class GatewaySession
return lease;
}
// Reconnect/resume variant of StartDistributorAndRegister (Task 12). Snapshots the replay
// ring for events newer than afterSequence AND registers the live subscriber atomically
// under the distributor's replay lock, so the replay→live handoff has no gap and no
// duplicate (see SessionEventDistributor.RegisterWithReplay). The pump is started after
// registration, exactly as the fresh-attach path, so the very first subscriber on a
// freshly-Ready session still sees the stream from its beginning.
private IEventSubscriberLease StartDistributorAndRegisterWithReplay(
ulong afterSequence,
out IReadOnlyList<MxEvent> replayedEvents,
out bool gap,
out ulong oldestAvailableSequence,
out ulong liveResumeSequence)
{
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
IEventSubscriberLease lease = distributor.RegisterWithReplay(
afterSequence,
out replayedEvents,
out gap,
out oldestAvailableSequence,
out liveResumeSequence);
StartPumpIfRequested(distributor, startNow);
return lease;
}
// Constructs the distributor exactly once and reports whether THIS caller is the one
// that should start the pump (i.e. it observed the unstarted state and claimed the
// start). Both the construction and the started-flag flip happen under _syncRoot so two
@@ -419,7 +493,8 @@ public sealed class GatewaySession
eventOptions.ReplayRetentionSeconds,
_eventStreaming.DistributorLogger,
_eventStreaming.TimeProvider,
CreateOverflowHandler(eventOptions.BackpressurePolicy));
CreateOverflowHandler(eventOptions.BackpressurePolicy),
singleSubscriberMode: !_eventStreaming.AllowMultipleEventSubscribers);
}
startNow = false;
@@ -677,17 +752,61 @@ public sealed class GatewaySession
}
}
/// <summary>
/// Determines whether the session's detach-grace retention window has elapsed: the
/// session entered detach-grace (its last external event subscriber dropped while a
/// positive detach-grace was configured) and has had no external subscriber re-attach
/// for longer than the configured detach-grace. The lease monitor closes such a
/// session exactly as it closes an expired lease. Always returns <see langword="false"/>
/// when detach-grace is disabled or when an external subscriber is attached (the
/// detached timestamp is cleared on re-attach, so an attached session is never within a
/// window).
/// </summary>
/// <param name="now">Current timestamp for comparison.</param>
public bool IsDetachGraceExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _detachGrace > TimeSpan.Zero
&& _activeEventSubscriberCount == 0
&& _detachedAtUtc is not null
&& now - _detachedAtUtc.Value >= _detachGrace;
}
}
/// <summary>
/// Attaches an event subscriber and returns a lease whose
/// <see cref="IEventSubscriberLease.Reader"/> reads the fanned public
/// <see cref="MxEvent"/>s for this subscriber. The single-subscriber guard
/// (Tasks 7/8 relax it) is unchanged: with multi-subscriber disabled a second
/// attach is rejected. The returned lease, when disposed, unregisters the
/// distributor subscriber AND decrements the active-subscriber count.
/// <see cref="MxEvent"/>s for this subscriber. The returned lease, when disposed,
/// unregisters the distributor subscriber AND decrements the active-subscriber count.
/// </summary>
/// <param name="allowMultipleSubscribers">If true, allows multiple concurrent event subscribers.</param>
public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers)
/// <param name="maxSubscribers">
/// Maximum concurrent external subscribers in multi-subscriber mode
/// (<c>MxGateway:Sessions:MaxEventSubscribersPerSession</c>). Ignored when the
/// session is in single-subscriber mode (<c>AllowMultipleEventSubscribers == false</c>);
/// the effective cap is then 1. The gateway-owned internal dashboard subscriber is
/// registered directly on the distributor and is NOT counted here, so it never
/// consumes cap budget.
/// </param>
/// <remarks>
/// The subscriber mode is derived internally from
/// <see cref="SessionEventStreaming.AllowMultipleEventSubscribers"/> — the same source
/// the <see cref="SessionEventDistributor"/> uses to gate its FailFast decision — so
/// the cap-enforcement mode and the distributor's <c>singleSubscriberMode</c> field
/// cannot diverge. The count-check-and-increment runs atomically under
/// <c>_syncRoot</c>, so two concurrent attaches racing toward the cap can never both
/// succeed past it. On distributor-register failure the count is rolled back (see the
/// catch below).
/// </remarks>
public IEventSubscriberLease AttachEventSubscriber(int maxSubscribers)
{
// Derive the mode from the same source the distributor uses so the two can never
// diverge. Effective cap: 1 in single-subscriber mode, otherwise the configured
// maximum (clamped to at least 1 so a misconfigured non-positive value can never
// deadlock attaches in multi-subscriber mode).
bool allowMultipleSubscribers = _eventStreaming.AllowMultipleEventSubscribers;
int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1;
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
@@ -697,14 +816,24 @@ public sealed class GatewaySession
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
if (_activeEventSubscriberCount >= effectiveCap)
{
throw new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
throw allowMultipleSubscribers
? new SessionManagerException(
SessionManagerErrorCode.EventSubscriberLimitReached,
$"Session {SessionId} has reached its maximum of {effectiveCap} concurrent event stream subscribers.")
: new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
// An external subscriber (re)attached: cancel any in-flight detach-grace window so
// the lease monitor no longer treats this session as eligible for grace-expiry
// close. This is the reattach→grace-cancel transition; it races the sweeper's
// IsDetachGraceExpired read, and both run under _syncRoot so they serialize.
_detachedAtUtc = null;
}
// Construct/start the distributor and register this subscriber. Done outside the
@@ -722,6 +851,75 @@ public sealed class GatewaySession
}
}
/// <summary>
/// Reconnect/resume variant of <see cref="AttachEventSubscriber"/> (Task 12). Attaches
/// an event subscriber AND atomically snapshots the session replay ring for events newer
/// than <paramref name="afterSequence"/>, so a resuming client can replay what it missed
/// before live delivery resumes — with no gap and no duplicate across the handoff.
/// </summary>
/// <param name="maxSubscribers">See <see cref="AttachEventSubscriber"/>.</param>
/// <param name="afterSequence">
/// The last worker sequence the resuming client already observed. Replay returns events
/// strictly newer than this; the caller must filter the live channel to events strictly
/// newer than <see cref="EventSubscriberReplayAttachment.LiveResumeSequence"/>.
/// </param>
/// <returns>
/// The lease plus the replay batch, gap flag, and resume watermarks. See
/// <see cref="SessionEventDistributor.RegisterWithReplay"/> for the no-gap/no-duplicate
/// guarantee.
/// </returns>
public EventSubscriberReplayAttachment AttachEventSubscriberWithReplay(int maxSubscribers, ulong afterSequence)
{
bool allowMultipleSubscribers = _eventStreaming.AllowMultipleEventSubscribers;
int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1;
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (_activeEventSubscriberCount >= effectiveCap)
{
throw allowMultipleSubscribers
? new SessionManagerException(
SessionManagerErrorCode.EventSubscriberLimitReached,
$"Session {SessionId} has reached its maximum of {effectiveCap} concurrent event stream subscribers.")
: new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
_detachedAtUtc = null;
}
try
{
IEventSubscriberLease distributorLease = StartDistributorAndRegisterWithReplay(
afterSequence,
out IReadOnlyList<MxEvent> replayedEvents,
out bool gap,
out ulong oldestAvailableSequence,
out ulong liveResumeSequence);
return new EventSubscriberReplayAttachment(
new EventSubscriberLease(this, distributorLease),
replayedEvents,
gap,
oldestAvailableSequence,
liveResumeSequence);
}
catch
{
DetachEventSubscriber();
throw;
}
}
/// <summary>
/// Invokes a worker command synchronously and returns the reply.
/// </summary>
@@ -731,8 +929,8 @@ public sealed class GatewaySession
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
@@ -1074,12 +1272,20 @@ public sealed class GatewaySession
/// Reads events from the worker as an asynchronous enumerable stream.
/// </summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
/// <returns>An asynchronous stream of worker events.</returns>
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
return workerClient.ReadEventsAsync(cancellationToken);
await foreach (WorkerEvent workerEvent in workerClient
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return workerEvent;
}
}
/// <summary>
@@ -1171,6 +1377,74 @@ public sealed class GatewaySession
}
}
/// <summary>
/// Atomically re-verifies that the session is still eligible for sweep-initiated close
/// (lease expired OR detach-grace expired, with no active external subscriber) and, if so,
/// transitions to <c>Closing</c> in a single lock acquisition.
/// </summary>
/// <param name="now">Current timestamp used for expiry re-check.</param>
/// <param name="alreadyClosing">
/// Set to <see langword="true"/> when a concurrent close is already in flight; the caller
/// should treat the session as already being closed (same semantics as
/// <see cref="CloseAsync"/>).
/// </param>
/// <returns>
/// <see langword="true"/> when the state was flipped to <c>Closing</c> and the caller
/// should proceed with teardown; <see langword="false"/> when the session is already
/// closed OR is no longer eligible (a subscriber re-attached between the eligibility
/// check in the sweep loop and this call — the reconnect won the race and the session
/// should be left open).
/// </returns>
/// <remarks>
/// <para>
/// Race: <c>CloseExpiredLeasesAsync</c> evaluates <see cref="IsLeaseExpired"/> /
/// <see cref="IsDetachGraceExpired"/> outside the close lock, then calls
/// <see cref="CloseAsync"/> which takes <c>_closeLock</c>. A client can call
/// <see cref="AttachEventSubscriber"/> in between, clearing <c>_detachedAtUtc</c> and
/// incrementing <c>_activeEventSubscriberCount</c> — the session is no longer expired.
/// This method re-checks eligibility atomically under <c>_syncRoot</c> before
/// committing to <c>Closing</c>, so a reattach that wins the race leaves the session
/// in <c>Ready</c> and usable.
/// </para>
/// </remarks>
internal bool TryBeginCloseIfExpired(DateTimeOffset now, out bool alreadyClosing)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
alreadyClosing = _closeStarted;
return false;
}
// Re-verify eligibility atomically. If a subscriber reattached between the sweep's
// eligibility check and this point, neither condition holds and we decline.
bool eligible = IsLeaseExpiredCore(now) || IsDetachGraceExpiredCore(now);
if (!eligible)
{
alreadyClosing = false;
return false;
}
alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
return true;
}
}
// Lock-free (must be called under _syncRoot) helpers used by TryBeginCloseIfExpired.
private bool IsLeaseExpiredCore(DateTimeOffset now)
=> _activeEventSubscriberCount == 0
&& _leaseExpiresAt is not null
&& _leaseExpiresAt <= now;
private bool IsDetachGraceExpiredCore(DateTimeOffset now)
=> _detachGrace > TimeSpan.Zero
&& _activeEventSubscriberCount == 0
&& _detachedAtUtc is not null
&& now - _detachedAtUtc.Value >= _detachGrace;
// Final terminal transition; under _syncRoot to keep _state writes single-lock.
// Closed is unconditionally terminal — TransitionTo refuses to overwrite it —
// so we don't need to re-check the precondition here.
@@ -1398,34 +1672,136 @@ public sealed class GatewaySession
}
/// <summary>
/// Returns the worker client iff both the gateway-side session state AND
/// the worker client's own state are <see cref="SessionState.Ready"/> /
/// <see cref="WorkerClientState.Ready"/>. The two states can diverge under
/// load: <c>_state</c> only transitions on gateway-driven events (open,
/// close, fault), while <see cref="WorkerClient.State"/> can shift on
/// worker-side signals (heartbeat watchdog, pipe disconnect) before the
/// gateway's session-level reaction observes them. When that happens the
/// in-flight RPC fails fast here with both states surfaced in the
/// diagnostic (Server-030) so the actual mismatch is actionable instead
/// of misleading. The session usually transitions to <c>Faulted</c>
/// shortly after.
/// Bounded, opt-in async variant of the fail-fast readiness check. When the
/// session is <see cref="SessionState.Ready"/> but the worker has transiently diverged
/// to a non-terminal state (<see cref="WorkerClientState.Handshaking"/>/
/// <see cref="WorkerClientState.Created"/>) and the configured worker-ready wait timeout
/// is positive, this polls (outside <c>_syncRoot</c>) until the worker reaches
/// <see cref="WorkerClientState.Ready"/> or the deadline elapses, re-evaluating the
/// fast-path/fail-fast decision under the lock on each poll. Terminal worker states, a
/// missing worker, or a non-<c>Ready</c> session fail fast immediately. With the default
/// timeout of zero this behaves byte-for-byte like the synchronous fail-fast path: no
/// await, no delay.
/// </summary>
private IWorkerClient GetReadyWorkerClient()
/// <param name="cancellationToken">Token to cancel the wait.</param>
/// <returns>The worker client once both the session and worker are <c>Ready</c>.</returns>
private async Task<IWorkerClient> GetReadyWorkerClientAsync(CancellationToken cancellationToken)
{
const int pollIntervalMs = 25;
string? failureMessage;
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
if (ready is not null)
{
string workerState = _workerClient is null
? "<no worker>"
: _workerClient.State.ToString();
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.");
return ready;
}
// Only transient (non-terminal) worker states with a positive wait timeout fall
// through to the bounded wait loop. Everything else (terminal worker, no worker,
// session not Ready, or a zero timeout) fails fast right here under the lock. When
// the worker is merely transient (failureMessage is null) but the wait is disabled,
// build the both-states diagnostic so the zero-timeout path is byte-for-byte the
// original fail-fast message.
if (failureMessage is not null || _workerReadyWaitTimeout <= TimeSpan.Zero)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
failureMessage ?? BuildNotReadyMessage());
}
}
DateTimeOffset deadline = _eventStreaming.TimeProvider.GetUtcNow() + _workerReadyWaitTimeout;
while (true)
{
await Task.Delay(
TimeSpan.FromMilliseconds(pollIntervalMs),
_eventStreaming.TimeProvider,
cancellationToken)
.ConfigureAwait(false);
lock (_syncRoot)
{
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
if (ready is not null)
{
return ready;
}
// A terminal worker / missing worker / non-Ready session surfaced while we
// waited: fail fast immediately rather than burning the rest of the deadline.
if (failureMessage is not null)
{
throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage);
}
}
if (_eventStreaming.TimeProvider.GetUtcNow() >= deadline)
{
lock (_syncRoot)
{
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
if (ready is not null)
{
return ready;
}
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
failureMessage ?? BuildNotReadyMessage());
}
}
}
}
/// <summary>
/// Evaluates readiness while the caller already holds <c>_syncRoot</c>. Returns the
/// worker client when both the session and worker are <see cref="WorkerClientState.Ready"/>
/// (with <paramref name="failureMessage"/> set to <see langword="null"/>). Returns
/// <see langword="null"/> together with the both-states diagnostic in
/// <paramref name="failureMessage"/> when the worker is in a terminal state
/// (<see cref="WorkerClientState.Faulted"/>/<see cref="WorkerClientState.Closing"/>/
/// <see cref="WorkerClientState.Closed"/>), there is no worker, or the session is not
/// <see cref="SessionState.Ready"/>. Returns <see langword="null"/> with a
/// <see langword="null"/> <paramref name="failureMessage"/> when the session is
/// <c>Ready</c> but the worker is in a transient state
/// (<see cref="WorkerClientState.Handshaking"/>/<see cref="WorkerClientState.Created"/>) —
/// the signal for the async path to keep waiting.
/// </summary>
/// <param name="failureMessage">
/// The fail-fast both-states diagnostic when readiness cannot succeed, or
/// <see langword="null"/> for the keep-waiting (transient) signal.
/// </param>
/// <returns>The ready worker client, or <see langword="null"/>.</returns>
private IWorkerClient? EvaluateReadyUnderLock(out string? failureMessage)
{
if (_state == SessionState.Ready && _workerClient?.State == WorkerClientState.Ready)
{
failureMessage = null;
return _workerClient;
}
// Keep-waiting signal: session is Ready and the worker is merely transient.
if (_state == SessionState.Ready
&& _workerClient is { State: WorkerClientState.Handshaking or WorkerClientState.Created })
{
failureMessage = null;
return null;
}
failureMessage = BuildNotReadyMessage();
return null;
}
/// <summary>Builds the both-states not-ready diagnostic (must be called under <c>_syncRoot</c>).</summary>
/// <returns>The diagnostic message surfacing both the session and worker states.</returns>
private string BuildNotReadyMessage()
{
string workerState = _workerClient is null
? "<no worker>"
: _workerClient.State.ToString();
return $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.";
}
private void TrackItem(
@@ -1466,10 +1842,32 @@ public sealed class GatewaySession
{
lock (_syncRoot)
{
// Assert in debug so a genuine double-decrement (a logic error) surfaces
// loudly; the clamp below keeps release builds safe if it somehow fires.
Debug.Assert(_activeEventSubscriberCount > 0,
"DetachEventSubscriber called with _activeEventSubscriberCount already at 0 — possible double-dispose.");
if (_activeEventSubscriberCount > 0)
{
_activeEventSubscriberCount--;
}
// When the LAST external subscriber drops and detach-grace is enabled, retain the
// session instead of letting it linger only on the (long) lease: stamp the detached
// time so the lease monitor can close it once the grace window elapses. The session
// stays in its current (Ready) state and remains usable, so a reconnecting subscriber
// (Task 12) re-attaches normally. The gateway-owned internal dashboard subscriber is
// NOT counted in _activeEventSubscriberCount (it registers on the distributor with
// isInternal: true), so a session whose only remaining subscriber is the dashboard
// mirror still enters grace. Only stamp while the session is alive — once
// Closing/Closed/Faulted there is nothing to retain. This is the detach→grace-start
// transition; it shares _syncRoot with the reattach→grace-cancel write above and the
// sweeper's IsDetachGraceExpired read, so the three serialize.
if (_detachGrace > TimeSpan.Zero
&& _activeEventSubscriberCount == 0
&& _state is not (SessionState.Closing or SessionState.Closed or SessionState.Faulted))
{
_detachedAtUtc = _eventStreaming.TimeProvider.GetUtcNow();
}
}
}
@@ -13,12 +13,16 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// regardless of what the handler does.
/// </summary>
/// <param name="isOnlySubscriber">
/// <see langword="true"/> when the overflowing subscriber is the sole registered
/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
/// the session only in this case; with multiple subscribers FailFast degrades to a
/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
/// Always <see langword="false"/> for internal subscribers (the dashboard mirror) because
/// <see cref="SessionEventDistributor"/> excludes them from the external-subscriber count.
/// <see langword="true"/> when FailFast is allowed to fault the whole session for this
/// overflow. As of Task 8 this is gated on the SESSION MODE, not a live count: it is
/// <see langword="true"/> only for an external subscriber in single-subscriber mode
/// (<c>AllowMultipleEventSubscribers == false</c>), where at most one external subscriber
/// can ever exist. In multi-subscriber mode it is always <see langword="false"/>, so
/// FailFast degrades to a per-subscriber disconnect and one slow consumer never faults a
/// session shared by others; gating on the fixed mode also removes the Task 5 race where a
/// concurrent registration could make a count snapshot falsely report a sole subscriber.
/// Always <see langword="false"/> for internal subscribers (the dashboard mirror) so a
/// slow/broken dashboard can never fault the session.
/// </param>
/// <param name="isInternal">
/// <see langword="true"/> when the overflowing subscriber is the gateway-owned internal
@@ -40,8 +44,10 @@ public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInt
/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
/// bounded channel and the pump applies the policy to that subscriber alone (see
/// <see cref="SubscriberOverflowHandler"/> and <c>OnSubscriberOverflow</c>), leaving
/// the pump, the session, and other subscribers running. The class does not yet
/// remove the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
/// the pump, the session, and other subscribers running. Task 8 made the
/// FailFast-faults-session decision mode-gated: it fires only in single-subscriber
/// mode (<c>singleSubscriberMode</c>), so multi-subscriber FailFast always degrades to
/// a per-subscriber disconnect — see <c>OnSubscriberOverflow</c>. The ring buffer supports capacity
/// eviction (oldest entry dropped when the count exceeds
/// <c>replayBufferCapacity</c>) and age eviction (entries older than
/// <c>replayRetentionSeconds</c> dropped on the next append or query), and is
@@ -83,6 +89,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private readonly string _sessionId;
private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
private readonly bool _singleSubscriberMode;
private readonly SubscriberOverflowHandler? _overflowHandler;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger<SessionEventDistributor> _logger;
@@ -134,7 +141,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
int subscriberQueueCapacity,
ILogger<SessionEventDistributor> logger,
SubscriberOverflowHandler? overflowHandler = null)
SubscriberOverflowHandler? overflowHandler = null,
bool singleSubscriberMode = true)
: this(
sessionId,
eventSourceFactory,
@@ -143,7 +151,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
replayRetentionSeconds: 0,
logger,
TimeProvider.System,
overflowHandler)
overflowHandler,
singleSubscriberMode)
{
}
@@ -181,6 +190,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// handler. When <see langword="null"/> (unit/skeleton use) the offending subscriber is
/// still disconnected but no metric/fault side effect runs.
/// </param>
/// <param name="singleSubscriberMode">
/// <see langword="true"/> when the owning session is in single-subscriber mode
/// (<c>AllowMultipleEventSubscribers == false</c>). This gates the FailFast
/// session-fault decision in <c>OnSubscriberOverflow</c>: an external subscriber that
/// overflows reports <c>isOnlySubscriber == true</c> (legacy FailFast faults the
/// session) ONLY in single-subscriber mode. In multi-subscriber mode it is always
/// <see langword="false"/>, so FailFast degrades to a per-subscriber disconnect and a
/// transient registration race can never falsely fault a shared session (Task 8;
/// resolves the Task 5 REVISIT race). Defaults to <see langword="true"/> so existing
/// call sites and unit tests keep legacy single-subscriber FailFast behavior.
/// </param>
public SessionEventDistributor(
string sessionId,
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
@@ -189,7 +209,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
double replayRetentionSeconds,
ILogger<SessionEventDistributor> logger,
TimeProvider timeProvider,
SubscriberOverflowHandler? overflowHandler = null)
SubscriberOverflowHandler? overflowHandler = null,
bool singleSubscriberMode = true)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
@@ -202,6 +223,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
_singleSubscriberMode = singleSubscriberMode;
_overflowHandler = overflowHandler;
_shutdownTimeout = DefaultShutdownTimeout;
_replayBufferCapacity = replayBufferCapacity;
@@ -214,7 +236,11 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
/// <summary>
/// Gets the count of currently-registered subscribers.
/// Gets the count of currently-registered subscribers. This count INCLUDES internal
/// subscribers (e.g. the gateway-owned dashboard mirror registered via
/// <c>Register(isInternal: true)</c>), and therefore differs from
/// <see cref="GatewaySession.ActiveEventSubscriberCount"/>, which tracks only external
/// (gRPC) subscribers and excludes the internal dashboard subscriber.
/// </summary>
public int SubscriberCount => _subscribers.Count;
@@ -261,31 +287,14 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// </param>
public IEventSubscriberLease Register(bool isInternal = false)
{
// The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
//
// The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one
// slow reader can never stall the single pump that feeds every subscriber. FullMode
// is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
// WriteAsync overload), but because Wait is the only BoundedChannelFullMode under
// which TryWrite returns false when the channel is full. That false return IS the
// overflow signal the pump needs to apply the per-subscriber backpressure policy. The
// Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and
// re-introducing the silent data loss this task removes. So: Wait mode + TryWrite =
// a non-blocking pump that still detects a full subscriber channel.
Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Channel<MxEvent> channel = CreateSubscriberChannel();
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel, isInternal);
return RegisterSubscriber(subscriber);
}
private IEventSubscriberLease RegisterSubscriber(Subscriber subscriber)
{
// The disposed check AND the map add happen under the same lock with no await
// in between. DisposeAsync sets _disposed=true under this same lock before it
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
@@ -294,7 +303,154 @@ public sealed class SessionEventDistributor : IAsyncDisposable
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_subscribers[id] = subscriber;
_subscribers[subscriber.Id] = subscriber;
}
return new SubscriberLease(this, subscriber);
}
// Creates a per-subscriber bounded channel. The pump is the single writer; readers are
// single-consumer (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
//
// The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one slow
// reader can never stall the single pump that feeds every subscriber. FullMode is
// deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
// WriteAsync overload), but because Wait is the only BoundedChannelFullMode under which
// TryWrite returns false when the channel is full. That false return IS the overflow signal
// the pump needs to apply the per-subscriber backpressure policy. The Drop* modes would
// make TryWrite silently succeed-and-drop, hiding overflow and re-introducing silent data
// loss. So: Wait mode + TryWrite = a non-blocking pump that still detects a full channel.
private Channel<MxEvent> CreateSubscriberChannel()
=> Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
/// <summary>
/// Atomically snapshots the replay ring for events newer than
/// <paramref name="afterSequence"/> AND registers a live subscriber, so the
/// replay→live handoff has no gap and no duplicate (Task 12 reconnect/resume).
/// </summary>
/// <param name="afterSequence">
/// The last worker sequence the reconnecting client already observed. Replay returns
/// events strictly newer than this; the live channel is filtered (by the caller) to
/// events strictly newer than the last replayed sequence.
/// </param>
/// <param name="replayedEvents">
/// The retained events newer than <paramref name="afterSequence"/>, in ascending
/// sequence order. Never null; empty when nothing newer is retained.
/// </param>
/// <param name="gap">
/// <see langword="true"/> when events between <paramref name="afterSequence"/> and the
/// oldest retained event were already evicted (capacity/age), so the client missed
/// events that can no longer be replayed and must re-snapshot. Mirrors
/// <see cref="TryGetReplayFrom"/> gap semantics.
/// </param>
/// <param name="oldestAvailableSequence">
/// The oldest worker sequence still retained and replayable. <c>0</c> when nothing is
/// retained. Meaningful to the caller only when <paramref name="gap"/> is
/// <see langword="true"/> (it populates the ReplayGap sentinel's
/// <c>oldest_available_sequence</c>).
/// </param>
/// <param name="liveResumeSequence">
/// The worker sequence the live channel must resume strictly after: the highest
/// replayed sequence, or <paramref name="afterSequence"/> when nothing was replayed.
/// The caller MUST apply this as the per-subscriber live filter so any event that was
/// both replayed here and subsequently fanned into this subscriber's live channel is
/// dropped exactly once (no duplicate), while every newer event is delivered (no gap).
/// </param>
/// <param name="isInternal">
/// <see langword="true"/> for a gateway-owned internal subscriber. See
/// <see cref="Register"/>.
/// </param>
/// <remarks>
/// <para>
/// <b>Why this is atomic and the handoff is correct.</b> The replay snapshot and the
/// subscriber registration both run inside the SAME <c>_replayLock</c> critical
/// section. The pump appends each event to the replay buffer under <c>_replayLock</c>
/// <em>before</em> fanning it to subscribers (outside the lock). Therefore, relative
/// to this method's critical section, for every event E:
/// </para>
/// <list type="bullet">
/// <item>
/// If the pump appended E before this critical section, E is in
/// <paramref name="replayedEvents"/> (when newer than
/// <paramref name="afterSequence"/>). The pump's fan-out of E may race the
/// registration: if it writes E to this new channel too, E's sequence is
/// <c>&lt;= liveResumeSequence</c>, so the caller's live filter DROPS it — no
/// duplicate.
/// </item>
/// <item>
/// If the pump appends E after this critical section, E is NOT in the snapshot,
/// but this subscriber is already registered, so the pump fans E into the live
/// channel with sequence <c>&gt; liveResumeSequence</c> — delivered as live, no
/// gap.
/// </item>
/// </list>
/// <para>
/// Lock ordering: this is the only path that holds both <c>_replayLock</c> and
/// <c>_lifecycleLock</c>; it always takes <c>_replayLock</c> first then
/// <c>_lifecycleLock</c>. No other path acquires both, so there is no inversion.
/// </para>
/// </remarks>
public IEventSubscriberLease RegisterWithReplay(
ulong afterSequence,
out IReadOnlyList<MxEvent> replayedEvents,
out bool gap,
out ulong oldestAvailableSequence,
out ulong liveResumeSequence,
bool isInternal = false)
{
Channel<MxEvent> channel = CreateSubscriberChannel();
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel, isInternal);
// Snapshot replay AND register under a single _replayLock section so the live channel
// begins exactly where the replay snapshot ends — see the remarks for the no-gap /
// no-duplicate argument. _lifecycleLock is nested inside (consistent ordering) only to
// honor the disposed check and the same add semantics as Register.
lock (_replayLock)
{
EvictAged();
List<MxEvent> newer = [];
ulong highestReplayed = afterSequence;
if (_replayBuffer.Count == 0)
{
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
oldestAvailableSequence = 0; // meaningful only when gap == true; 0 here since nothing is retained
}
else
{
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
gap = oldestRetained > 0 && afterSequence < oldestRetained - 1;
// Per the contract on OldestAvailableSequence: meaningful only when gap == true.
oldestAvailableSequence = gap ? oldestRetained : 0;
foreach (ReplayEntry entry in _replayBuffer)
{
if (entry.Event.WorkerSequence > afterSequence)
{
newer.Add(entry.Event);
highestReplayed = entry.Event.WorkerSequence;
}
}
}
replayedEvents = newer;
liveResumeSequence = highestReplayed;
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_subscribers[id] = subscriber;
}
}
return new SubscriberLease(this, subscriber);
@@ -416,28 +572,25 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// slow consumer must not fault a session shared by other healthy subscribers.
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
{
// Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
// the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
// when the overflowing subscriber is the sole subscriber.
// Decide whether FailFast may fault the whole session for this overflow. This is the
// "isOnlySubscriber" signal the legacy single-subscriber FailFast path keys on.
//
// This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
// by the validator and the single-subscriber guard in AttachEventSubscriber — a
// concurrent second registration is impossible, so the false-FailFast race (two
// subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
// FailFast wrongly faults the session) cannot occur today.
// Task 8 resolution of the Task 5/7 REVISIT race: gate this on the SESSION MODE
// (_singleSubscriberMode), NOT on a live count snapshot. The old
// `CountExternalSubscribers() == 1` snapshot raced once multi-subscriber became real —
// a concurrent second registration/unregistration could make the count read as 1 with
// two subscribers actually present, producing a false FailFast that faults a shared
// session. The mode is fixed for the session's lifetime, so reading it is race-free:
// - single-subscriber mode: at most one external subscriber can ever exist (the
// AttachEventSubscriber guard enforces it), so an overflowing external subscriber
// IS the sole subscriber — preserve the legacy FailFast session-fault behavior.
// - multi-subscriber mode: never fault the shared session; FailFast degrades to a
// per-subscriber disconnect so one slow consumer cannot punish healthy ones.
//
// REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
// race window opens — a concurrent second registration could cause Count to read as 1
// here even with two subscribers, producing a false FailFast that faults a shared
// session. Resolve before enabling multi-subscriber.
//
// Task 6: the gateway-owned internal dashboard subscriber is excluded from this
// accounting. (a) An internal subscriber that overflows is NEVER the "only subscriber"
// — a slow/broken dashboard must never fault the session, only disconnect its own
// mirror. (b) Internal subscribers are excluded from the count, so a lone external
// gRPC subscriber still reports isOnlySubscriber==true and preserves the legacy
// FailFast session-fault behavior even while the dashboard mirror is attached.
bool isOnlySubscriber = !subscriber.IsInternal && CountExternalSubscribers() == 1;
// Task 6: the gateway-owned internal dashboard subscriber is excluded — an internal
// subscriber that overflows is NEVER the "only subscriber", so a slow/broken dashboard
// can only disconnect its own mirror and never fault the session.
bool isOnlySubscriber = !subscriber.IsInternal && _singleSubscriberMode;
_logger.LogDebug(
"Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
@@ -473,22 +626,6 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
}
// Counts external (non-internal) subscribers. Drives the isOnlySubscriber FailFast
// decision so the gateway-owned internal dashboard subscriber never inflates the count.
private int CountExternalSubscribers()
{
int count = 0;
foreach (Subscriber subscriber in _subscribers.Values)
{
if (!subscriber.IsInternal)
{
count++;
}
}
return count;
}
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
@@ -38,13 +38,24 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// EventsHub group regardless of whether a gRPC client is streaming. When null
/// (unit tests that don't exercise the dashboard mirror) no mirror is started.
/// </param>
/// <param name="AllowMultipleEventSubscribers">
/// The session's effective multi-subscriber mode (Task 8). Carried here so the session
/// can pass it to its <see cref="SessionEventDistributor"/> at construction — the
/// distributor is created at <c>MarkReady</c> (for the dashboard mirror) before any gRPC
/// subscriber attaches, so the mode cannot be learned from a later
/// <c>AttachEventSubscriber</c> call. The distributor gates its FailFast session-fault
/// decision on this mode (single-subscriber only) instead of a live count snapshot,
/// closing the Task 5 false-FailFast race. Defaults to <see langword="false"/>
/// (single-subscriber) so existing call sites and unit tests are unchanged.
/// </param>
public sealed record SessionEventStreaming(
MxAccessGrpcMapper Mapper,
EventOptions EventOptions,
ILogger<SessionEventDistributor> DistributorLogger,
TimeProvider TimeProvider,
GatewayMetrics Metrics,
IDashboardEventBroadcaster? DashboardBroadcaster = null)
IDashboardEventBroadcaster? DashboardBroadcaster = null,
bool AllowMultipleEventSubscribers = false)
{
/// <summary>
/// Defaults used when a session is constructed without explicit streaming
@@ -17,6 +17,7 @@ public sealed class SessionManager : ISessionManager
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
public const string DetachGraceExpiredReason = "detach-grace-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
@@ -295,12 +296,37 @@ public sealed class SessionManager : ISessionManager
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
// A session is swept when its normal lease has expired OR its detach-grace
// retention window has elapsed (last external subscriber dropped and no client
// reconnected within DetachGraceSeconds). The detach-grace close is the same
// teardown as a lease-expiry close; only the reason differs so operators can tell
// a short reconnect-window expiry from a long idle-lease expiry in logs/metrics.
// Lease-expiry takes PRECEDENCE over detach-grace when both conditions fire
// simultaneously (reason will be lease-expired, not detach-grace-expired).
//
// TOCTOU note: eligibility is re-verified atomically inside TryBeginCloseIfExpired
// under _syncRoot, so a client that reattaches a subscriber between the check above
// and the close call wins the race and the session is left open and usable.
string? reason = session.IsLeaseExpired(now)
? LeaseExpiredReason
: session.IsDetachGraceExpired(now)
? DetachGraceExpiredReason
: null;
if (reason is null)
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
// Re-verify eligibility atomically and begin the Closing transition before
// delegating to CloseSessionCoreAsync. If a subscriber reattached between the
// IsLeaseExpired/IsDetachGraceExpired check above and here, TryBeginCloseIfExpired
// returns false and we skip this session (it is no longer expired).
if (!session.TryBeginCloseIfExpired(now, out bool alreadyClosing) && !alreadyClosing)
{
continue;
}
await CloseSessionCoreAsync(session, reason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
@@ -461,7 +487,8 @@ public sealed class SessionManager : ISessionManager
_distributorLogger,
_timeProvider,
_metrics,
_dashboardEventBroadcaster);
_dashboardEventBroadcaster,
_options.Sessions.AllowMultipleEventSubscribers);
return new GatewaySession(
sessionId,
@@ -477,7 +504,9 @@ public sealed class SessionManager : ISessionManager
shutdownTimeout,
leaseDuration,
openedAt,
eventStreaming);
eventStreaming,
TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)),
TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)));
}
private static string CreateClientCorrelationId(
@@ -5,6 +5,7 @@ public enum SessionManagerErrorCode
SessionNotFound,
SessionNotReady,
EventSubscriberAlreadyActive,
EventSubscriberLimitReached,
EventQueueOverflow,
SessionLimitExceeded,
OpenFailed,
@@ -46,6 +46,7 @@
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"DetachGraceSeconds": 30,
"AllowMultipleEventSubscribers": false,
"MaxEventSubscribersPerSession": 8
},
@@ -34,8 +34,10 @@ public sealed class GatewayOptionsTests
Assert.Equal(128, options.Sessions.MaxPendingCommandsPerSession);
Assert.Equal(1800, options.Sessions.DefaultLeaseSeconds);
Assert.Equal(30, options.Sessions.LeaseSweepIntervalSeconds);
Assert.Equal(30, options.Sessions.DetachGraceSeconds);
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
Assert.Equal(8, options.Sessions.MaxEventSubscribersPerSession);
Assert.Equal(0, options.Sessions.WorkerReadyWaitTimeoutMs);
Assert.Equal(10_000, options.Events.QueueCapacity);
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
@@ -86,6 +88,8 @@ public sealed class GatewayOptionsTests
[InlineData("MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds", "0", "MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.")]
[InlineData("MxGateway:Sessions:DefaultLeaseSeconds", "0", "MxGateway:Sessions:DefaultLeaseSeconds must be greater than zero.")]
[InlineData("MxGateway:Sessions:LeaseSweepIntervalSeconds", "0", "MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.")]
[InlineData("MxGateway:Sessions:DetachGraceSeconds", "-1", "MxGateway:Sessions:DetachGraceSeconds must be zero or greater (0 disables detach-grace retention).")]
[InlineData("MxGateway:Sessions:WorkerReadyWaitTimeoutMs", "-1", "MxGateway:Sessions:WorkerReadyWaitTimeoutMs must be greater than or equal to zero.")]
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
[InlineData("MxGateway:Protocol:MaxGrpcMessageBytes", "0", "MxGateway:Protocol:MaxGrpcMessageBytes must be between")]
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
@@ -119,6 +123,18 @@ public sealed class GatewayOptionsTests
StringComparison.Ordinal);
}
[Fact]
public void DashboardOptions_DisableLogin_DefaultsToFalse()
{
Assert.False(new DashboardOptions().DisableLogin);
}
[Fact]
public void DashboardOptions_AutoLoginUser_DefaultsToNull()
{
Assert.Null(new DashboardOptions().AutoLoginUser);
}
private static GatewayOptions BindOptions(IReadOnlyDictionary<string, string?> configurationValues)
{
using ServiceProvider services = BuildServices(configurationValues);
@@ -356,4 +356,41 @@ public sealed class GatewayOptionsValidatorTests
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
// -------------------------------------------------------------------------
// WorkerReadyWaitTimeoutMs validation
// -------------------------------------------------------------------------
[Fact]
public void Validate_Fails_WhenWorkerReadyWaitTimeoutMsIsNegative()
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { WorkerReadyWaitTimeoutMs = -1 });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Failed);
Assert.Contains(
result.Failures!,
f => f.Contains("MxGateway:Sessions:WorkerReadyWaitTimeoutMs"));
}
[Fact]
public void Validate_Succeeds_WhenWorkerReadyWaitTimeoutMsIsZero()
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { WorkerReadyWaitTimeoutMs = 0 });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
[Fact]
public void Validate_Succeeds_WhenWorkerReadyWaitTimeoutMsIsPositive()
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { WorkerReadyWaitTimeoutMs = 5000 });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
}
@@ -75,6 +75,35 @@ public sealed class GalaxyHierarchyIndexTests
}
}
/// <summary>
/// Verifies an object whose parent is absent from the set (e.g. a deleted/undeployed
/// container area) is re-rooted under sentinel 0 rather than vanishing under a phantom
/// parent id that browse never reaches from the root.
/// </summary>
[Fact]
public void ChildrenByParent_OrphanWithMissingParent_AppearsAsRoot()
{
GalaxyObject realRoot = new() { GobjectId = 1, ParentGobjectId = 0, IsArea = true, ContainedName = "RealRoot" };
GalaxyObject orphanArea = new() { GobjectId = 2, ParentGobjectId = 5008, IsArea = true, ContainedName = "Orphan" };
GalaxyObject orphanChild = new() { GobjectId = 3, ParentGobjectId = 2, ContainedName = "Child" };
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build([realRoot, orphanArea, orphanChild]);
// Both the real root and the orphan (its parent 5008 is absent) surface under root.
Assert.True(index.ChildrenByParent.TryGetValue(0, out IReadOnlyList<GalaxyObjectView>? roots));
Assert.NotNull(roots);
Assert.Contains(roots!, view => view.Object.GobjectId == 1);
Assert.Contains(roots!, view => view.Object.GobjectId == 2);
// The orphan keeps its own deployed children nested beneath it.
Assert.True(index.ChildrenByParent.TryGetValue(2, out IReadOnlyList<GalaxyObjectView>? underOrphan));
Assert.Single(underOrphan!);
Assert.Equal(3, underOrphan![0].Object.GobjectId);
// Nothing buckets under the phantom parent id.
Assert.False(index.ChildrenByParent.ContainsKey(5008));
}
/// <summary>Verifies <see cref="GalaxyHierarchyIndex.ObjectViewsByTagName"/> is OrdinalIgnoreCase and supports O(1) lookups.</summary>
[Fact]
public void ObjectViewsByTagName_IsCaseInsensitive_AndLookupsAreO1()
@@ -0,0 +1,47 @@
using System.Security.Claims;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardAutoLoginAuthenticationHandlerTests
{
[Fact]
public void CreatePrincipal_MintsAuthenticatedMultiRoleUser()
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal("multi-role");
Assert.True(principal.Identity!.IsAuthenticated);
Assert.Equal("multi-role", principal.Identity!.Name);
Assert.True(principal.IsInRole(DashboardRoles.Admin));
Assert.True(principal.IsInRole(DashboardRoles.Viewer));
}
[Theory]
[InlineData(null)]
[InlineData("")]
[InlineData(" ")]
public void CreatePrincipal_BlankUser_FallsBackToDefault(string? user)
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal(user);
Assert.Equal(DashboardAutoLoginAuthenticationHandler.DefaultUser, principal.Identity!.Name);
}
[Fact]
public void CreatePrincipal_TrimsUser()
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal(" multi-role ");
Assert.Equal("multi-role", principal.Identity!.Name);
}
[Fact]
public void CreatePrincipal_CustomUser_PreservesNameAndRoles()
{
ClaimsPrincipal principal = DashboardAutoLoginAuthenticationHandler.CreatePrincipal("gw-viewer");
Assert.Equal("gw-viewer", principal.Identity!.Name);
Assert.True(principal.IsInRole(DashboardRoles.Admin));
Assert.True(principal.IsInRole(DashboardRoles.Viewer));
}
}
@@ -0,0 +1,59 @@
using System.Security.Claims;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.MxGateway.Server;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardDisableLoginTests
{
[Fact]
public async Task DisableLoginOff_CookieSchemeUsesCookieHandler()
{
await using WebApplication app = GatewayApplication.Build([]);
IAuthenticationSchemeProvider provider =
app.Services.GetRequiredService<IAuthenticationSchemeProvider>();
AuthenticationScheme? scheme = await provider.GetSchemeAsync(
DashboardAuthenticationDefaults.AuthenticationScheme);
Assert.NotNull(scheme);
Assert.Equal(typeof(CookieAuthenticationHandler), scheme!.HandlerType);
}
[Fact]
public async Task DisableLoginOn_CookieSchemeUsesAutoLoginHandler()
{
await using WebApplication app = GatewayApplication.Build(
["--MxGateway:Dashboard:DisableLogin=true"]);
IAuthenticationSchemeProvider provider =
app.Services.GetRequiredService<IAuthenticationSchemeProvider>();
AuthenticationScheme? scheme = await provider.GetSchemeAsync(
DashboardAuthenticationDefaults.AuthenticationScheme);
Assert.NotNull(scheme);
Assert.Equal(typeof(DashboardAutoLoginAuthenticationHandler), scheme!.HandlerType);
}
[Fact]
public async Task DisableLoginOn_AutoLoginPrincipalSatisfiesAdminAndViewerPolicies()
{
await using WebApplication app = GatewayApplication.Build(
["--MxGateway:Dashboard:DisableLogin=true"]);
IAuthorizationService authorization =
app.Services.GetRequiredService<IAuthorizationService>();
ClaimsPrincipal user = DashboardAutoLoginAuthenticationHandler.CreatePrincipal("multi-role");
Assert.True((await authorization.AuthorizeAsync(
user, resource: null, DashboardAuthenticationDefaults.AdminPolicy)).Succeeded);
Assert.True((await authorization.AuthorizeAsync(
user, resource: null, DashboardAuthenticationDefaults.ViewerPolicy)).Succeeded);
Assert.True((await authorization.AuthorizeAsync(
user, resource: null, DashboardAuthenticationDefaults.HubClientsPolicy)).Succeeded);
}
}
@@ -533,63 +533,4 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>
/// Gets the process identifier.
/// </summary>
public int Id { get; } = processId;
/// <summary>
/// Gets a value indicating whether the process has exited.
/// </summary>
public bool HasExited { get; private set; }
/// <summary>
/// Gets the exit code of the process.
/// </summary>
public int? ExitCode { get; private set; }
/// <summary>
/// Waits for the process to exit asynchronously. Completes only when <see cref="Kill"/>
/// or <see cref="MarkExited"/> has been called, so callers that observe completion can
/// trust that exit actually happened (e.g., via the worker shutdown-ack path).
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task that completes when the process has actually exited.</returns>
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
}
/// <summary>
/// Terminates the process.
/// </summary>
/// <param name="entireProcessTree">Whether to kill the entire process tree.</param>
public void Kill(bool entireProcessTree)
{
MarkExited(-1);
}
/// <summary>
/// Releases resources used by this process.
/// </summary>
public void Dispose()
{
}
/// <summary>
/// Marks the process as exited with the specified exit code.
/// </summary>
/// <param name="exitCode">The process exit code.</param>
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
}
@@ -0,0 +1,743 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Security.Authentication;
using ZB.MOM.WW.MxGateway.Server.Security.Authorization;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway;
/// <summary>
/// End-to-end multi-subscriber fan-out tests through the real gRPC StreamEvents path via
/// the fake worker harness. Covers fan-out to two concurrent subscribers, independent
/// cancellation isolation, and the per-session subscriber cap.
/// </summary>
public sealed class GatewayEndToEndMultiSubscriberTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10);
private const int ServerHandle = 3001;
private const int ItemHandle = 4002;
private const int EventCount = 3;
/// <summary>
/// Two concurrent StreamEvents RPCs on one session both receive every worker event
/// the fake worker emits, in order.
/// </summary>
[Fact]
public async Task StreamEvents_TwoSubscribers_BothReceiveAllEvents()
{
MultiEventFakeWorkerProcessLauncher launcher = new(EventCount);
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-fanout",
ClientCorrelationId = "open-fanout",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
// Attach both event streams before triggering the Advise that causes events.
// This guarantees both subscribers are registered with the distributor before
// the worker emits anything.
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// The streams block internally until the session starts emitting; start them on the
// thread pool so the test can proceed to trigger the Advise.
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext()));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Wait until both subscribers are registered before issuing the Advise that
// triggers events. Polling ActiveEventSubscriberCount on the live GatewaySession
// is deterministic: we can't race past this point until the production code has
// actually incremented the counter.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Both writers must receive all events.
IReadOnlyList<MxEvent> events1 = await writer1.WaitForMessageCountAsync(EventCount, TestTimeout);
IReadOnlyList<MxEvent> events2 = await writer2.WaitForMessageCountAsync(EventCount, TestTimeout);
// Close the session, which completes both stream tasks.
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-fanout" },
new TestServerCallContext());
await stream1Task.WaitAsync(TestTimeout);
await stream2Task.WaitAsync(TestTimeout);
await launcher.WorkerTask.WaitAsync(TestTimeout);
// Both subscribers must have received all events.
Assert.Equal(EventCount, events1.Count);
Assert.Equal(EventCount, events2.Count);
// Events must arrive in the same order on both subscribers.
for (int i = 0; i < EventCount; i++)
{
Assert.Equal(MxEventFamily.OnDataChange, events1[i].Family);
Assert.Equal(MxEventFamily.OnDataChange, events2[i].Family);
// Sequence numbers must match between the two subscribers (same fan-out order).
Assert.Equal(events1[i].WorkerSequence, events2[i].WorkerSequence);
// Sequences must be strictly ascending across events.
if (i > 0)
{
Assert.True(events1[i].WorkerSequence > events1[i - 1].WorkerSequence);
}
Assert.Equal($"scripted-value-{i + 1}", events1[i].Value.StringValue);
Assert.Equal($"scripted-value-{i + 1}", events2[i].Value.StringValue);
}
}
/// <summary>
/// When one of two subscribers cancels its stream, the other subscriber continues
/// to receive subsequent events and the session remains usable.
/// </summary>
[Fact]
public async Task StreamEvents_OneSubscriberCancels_OtherContinuesReceivingEvents()
{
GatedEventFakeWorkerProcessLauncher launcher = new();
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-cancel",
ClientCorrelationId = "open-cancel",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
using CancellationTokenSource sub1Cts = new();
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// Sub1 uses a CancellationToken we can cancel independently.
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext(cancellationToken: sub1Cts.Token)));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Wait until both subscribers are registered before issuing Advise.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session: Register + AddItem + Advise.
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Emit first event; both subscribers should see it.
launcher.AllowNextEvent();
await writer1.WaitForFirstMessageAsync(TestTimeout);
await writer2.WaitForFirstMessageAsync(TestTimeout);
// Cancel sub1 and wait for it to finish.
await sub1Cts.CancelAsync();
try
{
// Awaiting stream1Task here is load-bearing: it ensures EventStreamService's
// finally block runs subscriber.Dispose() → sub1 is fully unregistered before
// AllowNextEvent() below fans the 2nd event. Reordering these two lines would
// let the 2nd event reach sub1 before it is removed, breaking the assertion
// that sub1 received exactly one event.
await stream1Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
{
// Expected: the iterator surfaces the cancellation.
}
catch (RpcException rpc) when (rpc.StatusCode == StatusCode.Cancelled)
{
// Also acceptable depending on gRPC exception wrapping.
}
// Emit a second event — only sub2 should see it.
launcher.AllowNextEvent();
await writer2.WaitForMessageCountAsync(2, TestTimeout);
// Sub1 must not have received the second event.
Assert.Single(writer1.Messages);
Assert.Equal(2, writer2.Messages.Count);
Assert.Equal(MxEventFamily.OnDataChange, writer2.Messages[1].Family);
// Tear down: signal the worker to stop emitting, then close the session.
launcher.StopEmitting();
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cancel" },
new TestServerCallContext());
try
{
await stream2Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
{
}
await launcher.WorkerTask.WaitAsync(TestTimeout);
}
/// <summary>
/// With <c>MaxEventSubscribersPerSession=2</c> a third concurrent StreamEvents call is
/// rejected with gRPC status <see cref="StatusCode.ResourceExhausted"/> while the first
/// two subscribers keep streaming.
/// </summary>
[Fact]
public async Task StreamEvents_ThirdSubscriberWhenCapIsTwo_ReceivesResourceExhausted()
{
const int cap = 2;
GatedEventFakeWorkerProcessLauncher launcher = new();
// AllowMultipleEventSubscribers=true is required: the cap rejection path
// (EventSubscriberLimitReached) is only reachable when multi-subscriber mode is on.
// Without it the service hits EventSubscriberAlreadyActive instead — also
// ResourceExhausted, but for the wrong reason — masking a misconfiguration.
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-cap",
ClientCorrelationId = "open-cap",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// Attach both streams first (before any events are emitted).
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext()));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Wait until both subscribers are registered before emitting or adding a third.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session so the worker is ready.
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Emit one event and confirm both attached streams see it.
launcher.AllowNextEvent();
await writer1.WaitForFirstMessageAsync(TestTimeout);
await writer2.WaitForFirstMessageAsync(TestTimeout);
// A third subscriber must be rejected with ResourceExhausted.
RecordingServerStreamWriter<MxEvent> writer3 = new();
RpcException capException = await Assert.ThrowsAsync<RpcException>(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer3,
new TestServerCallContext()));
Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode);
// Confirm this is the cap rejection, not the single-subscriber rejection
// (EventSubscriberAlreadyActive). The production message for
// EventSubscriberLimitReached contains "maximum".
Assert.Contains("maximum", capException.Status.Detail, StringComparison.OrdinalIgnoreCase);
// The first two streams must still be live (not completed).
Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected.");
Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected.");
// Tear down.
launcher.StopEmitting();
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cap" },
new TestServerCallContext());
await stream1Task.WaitAsync(TestTimeout);
await stream2Task.WaitAsync(TestTimeout);
await launcher.WorkerTask.WaitAsync(TestTimeout);
}
// ---- helpers ----
private static MxCommandRequest CreateRegisterRequest(string sessionId) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "register-ms",
Command = new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = "multi-sub-e2e-client" },
},
};
private static MxCommandRequest CreateAddItemRequest(string sessionId, int serverHandle) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "add-item-ms",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = "Galaxy.Tag.Value",
},
},
};
private static MxCommandRequest CreateAdviseRequest(
string sessionId,
int serverHandle,
int itemHandle) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "advise-ms",
Command = new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle },
},
};
// ---- shared fake worker helpers ----
/// <summary>
/// Populates <paramref name="reply"/> with the scripted response for <paramref name="kind"/>.
/// Shared by both fake worker launchers so the reply logic is not duplicated.
/// </summary>
private static void ConfigureCommandReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
// ---- fixture ----
/// <summary>
/// Gateway service fixture with <c>AllowMultipleEventSubscribers=true</c> and a
/// configurable per-session subscriber cap.
/// </summary>
private sealed class MultiSubscriberGatewayServiceFixture : IAsyncDisposable
{
private readonly GatewayMetrics _metrics = new();
private readonly SessionRegistry _registry = new();
public MultiSubscriberGatewayServiceFixture(
IWorkerProcessLauncher launcher,
int maxEventSubscribersPerSession = 8)
{
IOptions<GatewayOptions> options = Options.Create(CreateOptions(maxEventSubscribersPerSession));
SessionWorkerClientFactory workerClientFactory = new(
launcher,
options,
_metrics,
NullLoggerFactory.Instance);
SessionManager sessionManager = new(
_registry,
workerClientFactory,
options,
_metrics,
logger: NullLogger<SessionManager>.Instance,
dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance);
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
_metrics);
Service = new MxAccessGatewayService(
sessionManager,
new GatewayRequestIdentityAccessor(),
new AllowAllConstraintEnforcer(),
new MxAccessGrpcRequestValidator(),
mapper,
eventStreamService,
_metrics,
NullLogger<MxAccessGatewayService>.Instance,
new FakeGatewayAlarmService());
}
public MxAccessGatewayService Service { get; }
/// <summary>
/// Polls <see cref="GatewaySession.ActiveEventSubscriberCount"/> for the session
/// identified by <paramref name="sessionId"/> until it reaches <paramref name="n"/>,
/// bounded by <paramref name="timeout"/>. Fails the test if the count is not reached
/// within the deadline.
/// </summary>
/// <remarks>
/// This is the deterministic gate that replaces <c>Task.Delay</c> before Advise
/// calls: it proves the production code has registered each subscriber before we
/// fan any events.
/// </remarks>
public async Task WaitForSubscriberCountAsync(string sessionId, int n, TimeSpan timeout)
{
using CancellationTokenSource deadlineCts = new(timeout);
while (true)
{
if (_registry.TryGet(sessionId, out GatewaySession? session)
&& session.ActiveEventSubscriberCount >= n)
{
return;
}
if (deadlineCts.IsCancellationRequested)
{
int actual = _registry.TryGet(sessionId, out GatewaySession? s)
? s.ActiveEventSubscriberCount
: -1;
Assert.Fail(
$"Timed out waiting for {n} event subscriber(s) on session {sessionId}. "
+ $"Actual count after {timeout.TotalSeconds:0.#}s: {actual}.");
}
await Task.Delay(millisecondsDelay: 5, deadlineCts.Token).ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
{
await session.DisposeAsync();
}
_metrics.Dispose();
}
private static GatewayOptions CreateOptions(int maxEventSubscribersPerSession) =>
new()
{
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 5,
ShutdownTimeoutSeconds = 5,
HeartbeatIntervalSeconds = 30,
HeartbeatGraceSeconds = 30,
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
},
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 5,
MaxSessions = 4,
AllowMultipleEventSubscribers = true,
MaxEventSubscribersPerSession = maxEventSubscribersPerSession,
},
Events = new EventOptions
{
QueueCapacity = 32,
},
};
}
// ---- fake worker launchers ----
/// <summary>
/// Fake worker that emits a fixed number of distinct OnDataChange events after an Advise
/// command, then waits for shutdown. Events carry an indexed string value so the test can
/// verify fan-out order across two subscribers.
/// </summary>
private sealed class MultiEventFakeWorkerProcessLauncher(int eventCount) : IWorkerProcessLauncher
{
public const int ProcessId = 7710;
private readonly FakeWorkerProcess _process = new(ProcessId);
public Task WorkerTask { get; private set; } = Task.CompletedTask;
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(new WorkerProcessHandle(
_process,
new WorkerProcessCommandLine("multi-event-fake-worker.exe", []),
DateTimeOffset.UtcNow));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested)
{
WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_process.MarkExited(0);
return;
}
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}.");
}
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
// After Advise emit all events immediately.
if (command.Kind == MxCommandKind.Advise)
{
for (int i = 1; i <= eventCount; i++)
{
int index = i;
await harness.EmitEventAsync(
MxEventFamily.OnDataChange,
cancellationToken,
mxEvent =>
{
mxEvent.ServerHandle = command.Advise.ServerHandle;
mxEvent.ItemHandle = command.Advise.ItemHandle;
mxEvent.Quality = 192;
mxEvent.Value = new MxValue
{
DataType = MxDataType.String,
StringValue = $"scripted-value-{index}",
};
mxEvent.OnDataChange = new OnDataChangeEvent();
}).ConfigureAwait(false);
}
}
}
}
}
/// <summary>
/// Fake worker that emits events one at a time, gated by
/// <see cref="AllowNextEvent"/>. The test drives the timing so assertions are
/// deterministic. Call <see cref="StopEmitting"/> before closing the session so the
/// worker loop exits cleanly and can process the shutdown envelope.
/// </summary>
private sealed class GatedEventFakeWorkerProcessLauncher : IWorkerProcessLauncher
{
public const int ProcessId = 7720;
private readonly FakeWorkerProcess _process = new(ProcessId);
// Capacity 64 so AllowNextEvent can be called ahead of time without blocking.
private readonly SemaphoreSlim _emitGate = new(0, 64);
private volatile bool _stopEmitting;
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <summary>Releases the gate so the worker emits one event.</summary>
public void AllowNextEvent() => _emitGate.Release();
/// <summary>
/// Signals the worker to stop waiting for the emit gate and process the
/// shutdown envelope. Must be called before <c>CloseSession</c>.
/// </summary>
public void StopEmitting()
{
_stopEmitting = true;
_emitGate.Release(); // unblock a pending gate wait if any
}
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(new WorkerProcessHandle(
_process,
new WorkerProcessCommandLine("gated-event-fake-worker.exe", []),
DateTimeOffset.UtcNow));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
int advisedServerHandle = 0;
int advisedItemHandle = 0;
int emittedCount = 0;
// Read envelopes one at a time. Between each envelope, if we have a
// subscription and the gate is ready, emit an event before reading the
// next envelope. When _stopEmitting is set, drain the gate and read
// remaining envelopes (including shutdown) without emitting.
while (!cancellationToken.IsCancellationRequested)
{
// While subscribed and not stopped, try to emit gated events using
// a short-timeout peek at the gate — yield to incoming envelopes to
// avoid starving shutdown processing.
while (advisedServerHandle != 0
&& !_stopEmitting
&& await _emitGate.WaitAsync(millisecondsTimeout: 0).ConfigureAwait(false))
{
int index = ++emittedCount;
await harness.EmitEventAsync(
MxEventFamily.OnDataChange,
cancellationToken,
mxEvent =>
{
mxEvent.ServerHandle = advisedServerHandle;
mxEvent.ItemHandle = advisedItemHandle;
mxEvent.Quality = 192;
mxEvent.Value = new MxValue
{
DataType = MxDataType.String,
StringValue = $"gated-value-{index}",
};
mxEvent.OnDataChange = new OnDataChangeEvent();
}).ConfigureAwait(false);
}
// Use a short timeout so the emit loop above is re-evaluated
// periodically — but long enough not to spam.
WorkerEnvelope? envelope = null;
try
{
using CancellationTokenSource readCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
readCts.CancelAfter(TimeSpan.FromMilliseconds(50));
envelope = await harness.ReadGatewayEnvelopeAsync(readCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
// Timed out waiting for an envelope — loop back to check gate / emit.
continue;
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_process.MarkExited(0);
return;
}
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}.");
}
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
if (command.Kind == MxCommandKind.Advise)
{
advisedServerHandle = command.Advise.ServerHandle;
advisedItemHandle = command.Advise.ItemHandle;
}
}
}
}
}
@@ -300,6 +300,218 @@ public sealed class EventStreamServiceTests
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
/// <summary>
/// Task 12: resuming with AfterWorkerSequence inside the retained window replays exactly
/// the newer retained events (in order, no dup) then live, with NO ReplayGap sentinel.
/// </summary>
[Fact]
public async Task StreamEventsAsync_ResumeWithinRetainedWindow_ReplaysNewerThenLive_NoSentinel()
{
System.Threading.Channels.Channel<WorkerEvent> live =
System.Threading.Channels.Channel.CreateUnbounded<WorkerEvent>();
FakeWorkerClient workerClient = new() { LiveEvents = live };
for (ulong sequence = 1; sequence <= 5; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
// Prime: drain the static 1..5 through a first subscriber so the replay ring retains them.
await PrimeReplayAsync(service, session.SessionId, expectedCount: 5);
// Resume after sequence 2: retained window [1..5] covers it — replay 3,4,5 then live.
await using IAsyncEnumerator<MxEvent> resume = service
.StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 2), CancellationToken.None)
.GetAsyncEnumerator();
MxEvent r3 = await ReadNextAsync(resume);
MxEvent r4 = await ReadNextAsync(resume);
MxEvent r5 = await ReadNextAsync(resume);
Assert.Equal(new ulong[] { 3, 4, 5 }, new[] { r3.WorkerSequence, r4.WorkerSequence, r5.WorkerSequence });
Assert.Null(r3.ReplayGap);
// No sentinel anywhere; next is a LIVE event.
live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange));
MxEvent liveEvent = await ReadNextAsync(resume);
Assert.Equal(6ul, liveEvent.WorkerSequence);
Assert.Null(liveEvent.ReplayGap);
}
/// <summary>
/// Task 12: resuming with AfterWorkerSequence older than the oldest retained yields the
/// ReplayGap sentinel FIRST (correct requested/oldest), then the retained tail, then live.
/// </summary>
[Fact]
public async Task StreamEventsAsync_ResumeOlderThanOldestRetained_EmitsSentinelFirst_ThenTailThenLive()
{
System.Threading.Channels.Channel<WorkerEvent> live =
System.Threading.Channels.Channel.CreateUnbounded<WorkerEvent>();
FakeWorkerClient workerClient = new() { LiveEvents = live };
for (ulong sequence = 1; sequence <= 5; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
// Replay capacity 3 retains only 3,4,5; 1,2 are evicted.
GatewaySession session = CreateReadySession(workerClient, replayBufferCapacity: 3);
EventStreamService service = CreateService(new FakeSessionManager(session));
await PrimeReplayAsync(service, session.SessionId, expectedCount: 5);
// Resume after 1: events 1,2 are below the oldest retained (3) and were evicted, so
// they are unrecoverable => sentinel first, then the retained tail 3,4,5, then live.
await using IAsyncEnumerator<MxEvent> realResume = service
.StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 1), CancellationToken.None)
.GetAsyncEnumerator();
MxEvent sentinel = await ReadNextAsync(realResume);
Assert.NotNull(sentinel.ReplayGap);
Assert.Equal(1ul, sentinel.ReplayGap.RequestedAfterSequence);
Assert.Equal(3ul, sentinel.ReplayGap.OldestAvailableSequence);
Assert.Equal(MxEventFamily.Unspecified, sentinel.Family);
Assert.Equal(session.SessionId, sentinel.SessionId);
MxEvent r3 = await ReadNextAsync(realResume);
MxEvent r4 = await ReadNextAsync(realResume);
MxEvent r5 = await ReadNextAsync(realResume);
Assert.Equal(new ulong[] { 3, 4, 5 }, new[] { r3.WorkerSequence, r4.WorkerSequence, r5.WorkerSequence });
Assert.Null(r3.ReplayGap);
live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange));
MxEvent liveEvent = await ReadNextAsync(realResume);
Assert.Equal(6ul, liveEvent.WorkerSequence);
}
/// <summary>
/// Task 12: the replay→live boundary is contiguous — no duplicate and no skip — even
/// when events span the handoff.
/// </summary>
[Fact]
public async Task StreamEventsAsync_ResumeHandoff_IsContiguous_NoDuplicateNoSkip()
{
System.Threading.Channels.Channel<WorkerEvent> live =
System.Threading.Channels.Channel.CreateUnbounded<WorkerEvent>();
FakeWorkerClient workerClient = new() { LiveEvents = live };
for (ulong sequence = 1; sequence <= 4; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
await PrimeReplayAsync(service, session.SessionId, expectedCount: 4);
// Resume after 2: replay 3,4 then live 5,6,7. Collect across the boundary and assert
// the full sequence is contiguous with no duplicate and no skip.
await using IAsyncEnumerator<MxEvent> resume = service
.StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 2), CancellationToken.None)
.GetAsyncEnumerator();
List<ulong> collected = [];
collected.Add((await ReadNextAsync(resume)).WorkerSequence); // 3
collected.Add((await ReadNextAsync(resume)).WorkerSequence); // 4
for (ulong sequence = 5; sequence <= 7; sequence++)
{
live.Writer.TryWrite(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
collected.Add((await ReadNextAsync(resume)).WorkerSequence);
}
Assert.Equal(new ulong[] { 3, 4, 5, 6, 7 }, collected);
}
/// <summary>
/// Task 12: the per-item filter applies to REPLAYED events identically to live — a
/// replayed event at/below the requested watermark is never delivered.
/// </summary>
[Fact]
public async Task StreamEventsAsync_ResumeReplay_AppliesPerItemFilter_DropsAtOrBelowWatermark()
{
System.Threading.Channels.Channel<WorkerEvent> live =
System.Threading.Channels.Channel.CreateUnbounded<WorkerEvent>();
FakeWorkerClient workerClient = new() { LiveEvents = live };
for (ulong sequence = 1; sequence <= 5; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
await PrimeReplayAsync(service, session.SessionId, expectedCount: 5);
// Resume after 3: only 4,5 may be delivered. Events 1,2,3 — present in the ring but at
// or below the watermark — must be filtered out of the replay, never seen. The first two
// reads must be exactly 4 then 5 (no sentinel, no <=3 event); a live tag confirms the
// stream resumed live strictly after 5.
await using IAsyncEnumerator<MxEvent> resume = service
.StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 3), CancellationToken.None)
.GetAsyncEnumerator();
MxEvent first = await ReadNextAsync(resume);
MxEvent second = await ReadNextAsync(resume);
Assert.Equal(4ul, first.WorkerSequence);
Assert.Equal(5ul, second.WorkerSequence);
Assert.Null(first.ReplayGap);
Assert.Null(second.ReplayGap);
// The very next delivered event is the live 6 — proving nothing <=3 slipped in and the
// handoff resumed strictly after the replay tail.
live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange));
MxEvent liveEvent = await ReadNextAsync(resume);
Assert.Equal(6ul, liveEvent.WorkerSequence);
}
/// <summary>
/// Task 12: AfterWorkerSequence == 0 is a fresh stream (not a resume) — no replay, no
/// sentinel, just live events as before.
/// </summary>
[Fact]
public async Task StreamEventsAsync_FreshStreamAfterSequenceZero_NoReplayNoSentinel()
{
FakeWorkerClient workerClient = new();
for (ulong sequence = 1; sequence <= 3; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
workerClient.CompleteAfterConfiguredEvents = true;
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal(new ulong[] { 1, 2, 3 }, events.Select(e => e.WorkerSequence));
Assert.DoesNotContain(events, e => e.ReplayGap is not null);
}
// Drains the first `expectedCount` events through a throwaway subscriber so the session's
// replay ring retains them, then disposes the subscriber. The pump (started on first
// attach) keeps running for the session, so subsequent resume attaches see the retained
// events.
private static async Task PrimeReplayAsync(
EventStreamService service,
string sessionId,
int expectedCount)
{
await using IAsyncEnumerator<MxEvent> primer = service
.StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None)
.GetAsyncEnumerator();
for (int i = 0; i < expectedCount; i++)
{
await ReadNextAsync(primer);
}
}
private static async Task<MxEvent> ReadNextAsync(IAsyncEnumerator<MxEvent> enumerator)
{
Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
return enumerator.Current;
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
@@ -334,11 +546,12 @@ public sealed class EventStreamServiceTests
return events;
}
private static StreamEventsRequest CreateRequest(string sessionId)
private static StreamEventsRequest CreateRequest(string sessionId, ulong afterWorkerSequence = 0)
{
return new StreamEventsRequest
{
SessionId = sessionId,
AfterWorkerSequence = afterWorkerSequence,
};
}
@@ -347,7 +560,8 @@ public sealed class EventStreamServiceTests
string sessionId = "session-events",
int queueCapacity = 8,
GatewayMetrics? metrics = null,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast,
int replayBufferCapacity = 1024)
{
// The per-subscriber overflow policy now lives in the session's
// SessionEventDistributor, so the session must share the same metrics sink and
@@ -373,6 +587,8 @@ public sealed class EventStreamServiceTests
{
QueueCapacity = queueCapacity,
BackpressurePolicy = backpressurePolicy,
ReplayBufferCapacity = replayBufferCapacity,
ReplayRetentionSeconds = 0,
},
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
@@ -513,6 +729,13 @@ public sealed class EventStreamServiceTests
/// <summary>Gets or sets whether to complete the event stream after configured events are yielded.</summary>
public bool CompleteAfterConfiguredEvents { get; set; }
/// <summary>
/// Optional live channel source. When set, the worker drains the static
/// <see cref="Events"/> first, then streams from this channel until it completes,
/// letting a test feed events on demand (e.g. to exercise replay→live handoff).
/// </summary>
public System.Threading.Channels.Channel<WorkerEvent>? LiveEvents { get; init; }
/// <summary>Gets or sets an optional exception to throw as a terminal event stream fault.</summary>
public Exception? TerminalException { get; init; }
@@ -558,6 +781,18 @@ public sealed class EventStreamServiceTests
throw TerminalException;
}
if (LiveEvents is not null)
{
await foreach (WorkerEvent liveEvent in LiveEvents.Reader
.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
yield return liveEvent;
}
yield break;
}
if (CompleteAfterConfiguredEvents)
{
yield break;
@@ -151,7 +151,7 @@ public sealed class GatewaySessionDashboardMirrorTests
session.MarkReady();
Assert.Equal(0, session.ActiveEventSubscriberCount);
using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false);
using IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
Assert.Equal(1, session.ActiveEventSubscriberCount);
}
@@ -1,11 +1,14 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
@@ -166,8 +169,8 @@ public sealed class GatewaySessionTests
/// completion and a client cancellation both fire at the same time — must
/// decrement <c>_activeEventSubscriberCount</c> exactly once, never to 1.
/// A negative count permanently blocks future subscribers because
/// <c>AttachEventSubscriber(allowMultipleSubscribers:false)</c> gates on
/// <c>_activeEventSubscriberCount > 0</c>. After both racing disposes finish,
/// <c>AttachEventSubscriber</c> gates on <c>_activeEventSubscriberCount >= effectiveCap</c>.
/// After both racing disposes finish,
/// the count must be exactly 0 and a subsequent single-subscriber attach must
/// succeed.
/// </summary>
@@ -184,8 +187,7 @@ public sealed class GatewaySessionTests
for (int i = 0; i < Iterations; i++)
{
// Attach one subscriber; this increments _activeEventSubscriberCount to 1.
IEventSubscriberLease lease = session.AttachEventSubscriber(
allowMultipleSubscribers: false);
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
// Race Concurrency threads all calling Dispose() on the same lease.
// Only one must actually run DetachEventSubscriber.
@@ -210,8 +212,7 @@ public sealed class GatewaySessionTests
// Observable contract: a fresh single subscriber must now be attachable
// (i.e., the guard _activeEventSubscriberCount > 0 is false).
IEventSubscriberLease next = session.AttachEventSubscriber(
allowMultipleSubscribers: false);
IEventSubscriberLease next = session.AttachEventSubscriber(maxSubscribers: 1);
next.Dispose();
Assert.Equal(0, session.ActiveEventSubscriberCount);
}
@@ -220,6 +221,432 @@ public sealed class GatewaySessionTests
await session.DisposeAsync();
}
/// <summary>
/// Task 8 regression. Single-subscriber mode rejects a SECOND concurrent external
/// attach with <see cref="SessionManagerErrorCode.EventSubscriberAlreadyActive"/> —
/// the legacy guard is preserved unchanged when multi-subscriber is disabled.
/// </summary>
[Fact]
public async Task AttachEventSubscriber_SingleMode_SecondAttachThrowsAlreadyActive()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySessionWithEventStreaming(workerClient);
using IEventSubscriberLease first = session.AttachEventSubscriber(maxSubscribers: 8);
SessionManagerException exception = Assert.Throws<SessionManagerException>(
() => session.AttachEventSubscriber(maxSubscribers: 8));
Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode);
Assert.Equal(1, session.ActiveEventSubscriberCount);
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
/// <summary>
/// Task 8. Multi-subscriber mode allows exactly <c>cap</c> concurrent external
/// subscribers; the (cap+1)-th attach throws
/// <see cref="SessionManagerErrorCode.EventSubscriberLimitReached"/>.
/// </summary>
[Fact]
public async Task AttachEventSubscriber_MultiMode_AttachesUpToCapThenThrowsLimitReached()
{
const int Cap = 4;
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySessionWithEventStreaming(
workerClient,
allowMultipleEventSubscribers: true);
List<IEventSubscriberLease> leases = [];
for (int i = 0; i < Cap; i++)
{
leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap));
}
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
SessionManagerException exception = Assert.Throws<SessionManagerException>(
() => session.AttachEventSubscriber(maxSubscribers: Cap));
Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode);
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
foreach (IEventSubscriberLease lease in leases)
{
lease.Dispose();
}
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
/// <summary>
/// Task 8. The gateway-owned INTERNAL dashboard subscriber must NOT consume cap
/// budget: with the dashboard mirror running, the full cap of external subscribers is
/// still attachable.
/// </summary>
[Fact]
public async Task AttachEventSubscriber_MultiMode_DashboardMirrorDoesNotConsumeCap()
{
const int Cap = 3;
FakeWorkerClient workerClient = new();
RecordingDashboardEventBroadcaster broadcaster = new();
GatewaySession session = CreateReadySessionWithEventStreaming(
workerClient,
allowMultipleEventSubscribers: true,
dashboardBroadcaster: broadcaster);
// The internal dashboard mirror registered on MarkReady is NOT counted as an external
// subscriber, so the external active count starts at zero.
Assert.Equal(0, session.ActiveEventSubscriberCount);
List<IEventSubscriberLease> leases = [];
for (int i = 0; i < Cap; i++)
{
leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap));
}
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
// The (cap+1)-th still fails: the dashboard mirror did not eat a slot.
SessionManagerException exception = Assert.Throws<SessionManagerException>(
() => session.AttachEventSubscriber(maxSubscribers: Cap));
Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode);
foreach (IEventSubscriberLease lease in leases)
{
lease.Dispose();
}
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
/// <summary>
/// Task 8 concurrency. Many concurrent attaches in multi-subscriber mode must never
/// exceed the cap: the count-check-and-increment is atomic under <c>_syncRoot</c>, so
/// exactly <c>cap</c> attaches succeed and the rest throw
/// <see cref="SessionManagerErrorCode.EventSubscriberLimitReached"/>. The observed
/// count never goes above the cap.
/// </summary>
[Fact]
public async Task AttachEventSubscriber_MultiMode_ConcurrentAttaches_NeverExceedCap()
{
const int Cap = 5;
const int Attempts = 32;
TimeSpan testTimeout = TimeSpan.FromSeconds(10);
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySessionWithEventStreaming(
workerClient,
allowMultipleEventSubscribers: true);
using SemaphoreSlim gate = new(0);
int successCount = 0;
int limitReachedCount = 0;
int maxObservedCount = 0;
IEventSubscriberLease?[] leases = new IEventSubscriberLease?[Attempts];
Task[] tasks = new Task[Attempts];
for (int t = 0; t < Attempts; t++)
{
int index = t;
tasks[index] = Task.Run(async () =>
{
await gate.WaitAsync(testTimeout);
try
{
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: Cap);
leases[index] = lease;
Interlocked.Increment(ref successCount);
}
catch (SessionManagerException exception)
when (exception.ErrorCode == SessionManagerErrorCode.EventSubscriberLimitReached)
{
Interlocked.Increment(ref limitReachedCount);
}
int observed = session.ActiveEventSubscriberCount;
int previousMax;
do
{
previousMax = Volatile.Read(ref maxObservedCount);
if (observed <= previousMax)
{
break;
}
}
while (Interlocked.CompareExchange(ref maxObservedCount, observed, previousMax) != previousMax);
});
}
gate.Release(Attempts);
await Task.WhenAll(tasks).WaitAsync(testTimeout);
Assert.Equal(Cap, successCount);
Assert.Equal(Attempts - Cap, limitReachedCount);
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
Assert.True(maxObservedCount <= Cap, $"Observed count {maxObservedCount} exceeded cap {Cap}.");
foreach (IEventSubscriberLease? lease in leases)
{
lease?.Dispose();
}
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
/// <summary>
/// Task 8. Disposing a subscriber frees a cap slot so a fresh attach succeeds, and a
/// double-dispose does not double-free the slot (count integrity preserved).
/// </summary>
[Fact]
public async Task AttachEventSubscriber_MultiMode_DisposeFreesSlotAndDoubleDisposeIsIdempotent()
{
const int Cap = 2;
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySessionWithEventStreaming(
workerClient,
allowMultipleEventSubscribers: true);
IEventSubscriberLease a = session.AttachEventSubscriber(maxSubscribers: Cap);
IEventSubscriberLease b = session.AttachEventSubscriber(maxSubscribers: Cap);
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
// At cap: next attach is rejected.
Assert.Throws<SessionManagerException>(
() => session.AttachEventSubscriber(maxSubscribers: Cap));
// Dispose one — and dispose it twice. The second dispose must not double-free.
a.Dispose();
a.Dispose();
Assert.Equal(1, session.ActiveEventSubscriberCount);
// Exactly one slot is free, so exactly one fresh attach succeeds.
using IEventSubscriberLease c = session.AttachEventSubscriber(maxSubscribers: Cap);
Assert.Equal(Cap, session.ActiveEventSubscriberCount);
Assert.Throws<SessionManagerException>(
() => session.AttachEventSubscriber(maxSubscribers: Cap));
b.Dispose();
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
/// <summary>
/// Task 11. With a positive detach-grace, dropping the last external subscriber must
/// RETAIN the session (it stays <see cref="SessionState.Ready"/>, not Closed/Faulted)
/// and record a detached timestamp so the lease monitor can age it out later.
/// </summary>
[Fact]
public async Task DetachGrace_LastSubscriberDrops_RetainsSessionAndRecordsDetachedTimestamp()
{
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
FakeWorkerClient workerClient = new();
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.FromSeconds(30));
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
Assert.Null(session.DetachedAtUtc);
lease.Dispose();
// Retained, not torn down.
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(0, session.ActiveEventSubscriberCount);
Assert.Equal(clock.GetUtcNow(), session.DetachedAtUtc);
Assert.False(session.IsDetachGraceExpired(clock.GetUtcNow()));
}
/// <summary>
/// Task 11. Advancing the clock past the detach-grace window makes the retained,
/// detached session eligible for close (<see cref="GatewaySession.IsDetachGraceExpired"/>).
/// </summary>
[Fact]
public async Task DetachGrace_ClockAdvancesPastWindow_SessionBecomesEligibleForClose()
{
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
FakeWorkerClient workerClient = new();
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.FromSeconds(30));
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
lease.Dispose();
// Just before the window elapses: not yet eligible.
clock.Advance(TimeSpan.FromSeconds(29));
Assert.False(session.IsDetachGraceExpired(clock.GetUtcNow()));
// At/after the window: eligible for close.
clock.Advance(TimeSpan.FromSeconds(1));
Assert.True(session.IsDetachGraceExpired(clock.GetUtcNow()));
}
/// <summary>
/// Task 11. Re-attaching a subscriber before the window elapses cancels the grace:
/// the detached timestamp clears and a subsequent clock advance does NOT make the
/// session eligible for close.
/// </summary>
[Fact]
public async Task DetachGrace_ReattachBeforeExpiry_CancelsGrace()
{
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
FakeWorkerClient workerClient = new();
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.FromSeconds(30));
IEventSubscriberLease first = session.AttachEventSubscriber(maxSubscribers: 1);
first.Dispose();
Assert.NotNull(session.DetachedAtUtc);
clock.Advance(TimeSpan.FromSeconds(10));
IEventSubscriberLease second = session.AttachEventSubscriber(maxSubscribers: 1);
// Re-attach cancelled the grace window.
Assert.Null(session.DetachedAtUtc);
// Advancing well past what would have been the window does not make it eligible while
// a subscriber is attached.
clock.Advance(TimeSpan.FromMinutes(5));
Assert.False(session.IsDetachGraceExpired(clock.GetUtcNow()));
second.Dispose();
}
/// <summary>
/// Task 11. With detach-grace disabled (<c>0</c>), dropping the last subscriber must
/// match today's behavior: the session stays <see cref="SessionState.Ready"/> with no
/// detached timestamp and is never eligible for a detach-grace close — it lingers only
/// until its normal lease expires.
/// </summary>
[Fact]
public async Task DetachGrace_Disabled_MatchesTodaysBehavior()
{
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
FakeWorkerClient workerClient = new();
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.Zero);
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
lease.Dispose();
Assert.Equal(SessionState.Ready, session.State);
Assert.Null(session.DetachedAtUtc);
clock.Advance(TimeSpan.FromHours(1));
Assert.False(session.IsDetachGraceExpired(clock.GetUtcNow()));
}
/// <summary>
/// Task 11. The gateway-owned internal dashboard subscriber must NOT keep a session out
/// of detach-grace: with only the dashboard mirror attached (and no external gRPC
/// subscriber), dropping the last external subscriber still enters grace and the
/// window still expires.
/// </summary>
[Fact]
public async Task DetachGrace_DashboardMirrorAlone_DoesNotPreventGraceEntry()
{
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
FakeWorkerClient workerClient = new();
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.FromSeconds(30),
dashboardBroadcaster: broadcaster);
// The dashboard mirror is the only subscriber (registered internally at MarkReady).
// It is not counted as an external subscriber.
Assert.Equal(0, session.ActiveEventSubscriberCount);
IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1);
lease.Dispose();
// Entered grace despite the dashboard mirror still being attached.
Assert.NotNull(session.DetachedAtUtc);
clock.Advance(TimeSpan.FromSeconds(30));
Assert.True(session.IsDetachGraceExpired(clock.GetUtcNow()));
}
/// <summary>
/// Task 11. Validates the TOCTOU fix: TryBeginCloseIfExpired atomically re-checks that no
/// subscriber has reattached before flipping to Closing. When the grace window has elapsed but
/// a subscriber is attached by the time TryBeginCloseIfExpired runs, it returns false and the
/// session remains Ready.
/// </summary>
[Fact]
public async Task TryBeginCloseIfExpired_ReattachedSubscriberWinsRace_DeclinesClose()
{
FakeWorkerClient workerClient = new();
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
await using GatewaySession session = CreateReadySessionWithDetachGrace(
workerClient,
clock,
detachGrace: TimeSpan.FromSeconds(30));
// Attach then drop to enter detach-grace, then advance past the window.
IDisposable firstSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
firstSubscriber.Dispose();
clock.Advance(TimeSpan.FromSeconds(31));
DateTimeOffset expiredNow = clock.GetUtcNow();
Assert.True(session.IsDetachGraceExpired(expiredNow)); // sanity: would be closed by sweep
// Simulate a client reconnecting before the sweeper calls TryBeginCloseIfExpired.
// The reattach clears _detachedAtUtc and increments _activeEventSubscriberCount so
// neither expiry condition holds any longer.
using IDisposable reconnected = session.AttachEventSubscriber(maxSubscribers: 1);
Assert.Null(session.DetachedAtUtc);
// TryBeginCloseIfExpired must see the reattach and decline — the session stays Ready.
bool began = session.TryBeginCloseIfExpired(expiredNow, out bool alreadyClosing);
Assert.False(began);
Assert.False(alreadyClosing);
Assert.Equal(SessionState.Ready, session.State);
}
private static GatewaySession CreateReadySessionWithDetachGrace(
IWorkerClient workerClient,
TimeProvider timeProvider,
TimeSpan detachGrace,
IDashboardEventBroadcaster? dashboardBroadcaster = null)
{
GatewaySession session = new(
sessionId: "session-test-detach-grace",
backendName: "mxaccess",
pipeName: "mxaccess-gateway-1-session-test-detach-grace",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: timeProvider.GetUtcNow(),
eventStreaming: new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
timeProvider,
new GatewayMetrics(),
dashboardBroadcaster),
detachGrace: detachGrace);
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
}
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
{
GatewaySession session = new(
@@ -241,7 +668,10 @@ public sealed class GatewaySessionTests
return session;
}
private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient)
private static GatewaySession CreateReadySessionWithEventStreaming(
IWorkerClient workerClient,
bool allowMultipleEventSubscribers = false,
IDashboardEventBroadcaster? dashboardBroadcaster = null)
{
GatewaySession session = new(
sessionId: "session-test-concurrent",
@@ -262,7 +692,9 @@ public sealed class GatewaySessionTests
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
new GatewayMetrics()));
new GatewayMetrics(),
dashboardBroadcaster,
allowMultipleEventSubscribers));
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
@@ -113,6 +113,27 @@ public sealed class SessionEventDistributorTests
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
}
[Fact]
public async Task RegisterWithReplay_AfterDispose_ThrowsObjectDisposedException()
{
// Pins the nested-lock disposal behavior in RegisterWithReplay: the inner
// _lifecycleLock check must surface ObjectDisposedException even when the outer
// _replayLock snapshot succeeds on a disposed distributor.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
Assert.Throws<ObjectDisposedException>(() =>
distributor.RegisterWithReplay(
0,
out _,
out _,
out _,
out _));
}
[Fact]
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
{
@@ -355,7 +376,8 @@ public sealed class SessionEventDistributorTests
Interlocked.Increment(ref overflowCalls);
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
});
},
singleSubscriberMode: false);
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
@@ -377,7 +399,7 @@ public sealed class SessionEventDistributorTests
async () => await DrainUntilFaultAsync(slow.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
// Multi-subscriber mode, so isOnlySubscriber is always false (Task 8 mode-gating).
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
// pump-thread writes, avoiding a data race by the C# memory model.
Assert.Equal(1, Volatile.Read(ref overflowCalls));
@@ -395,15 +417,12 @@ public sealed class SessionEventDistributorTests
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
{
// Distributor-level pin for "FailFast with multiple subscribers degrades to
// disconnect-only (no session fault)": when the overflowing subscriber is NOT the
// sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
// fault the session. This test drives the distributor directly (without GatewaySession)
// with two subscribers and a FailFast-style overflow handler seam, overflows the slow
// one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
// receiving, and (c) the pump keeps running — all without a GatewaySession.
//
// TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
// multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
// disconnect-only (no session fault)": in multi-subscriber mode isOnlySubscriber is
// always false (Task 8 mode-gating), so a FailFast-wired handler must NOT fault the
// session. This test drives the distributor directly (without GatewaySession) in
// multi-subscriber mode with two subscribers and a FailFast-style overflow handler
// seam, overflows the slow one, and asserts (a) isOnlySubscriber==false, (b) the other
// subscriber keeps receiving, and (c) the pump keeps running.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
bool handlerFiredWithFalse = false;
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
@@ -427,7 +446,8 @@ public sealed class SessionEventDistributorTests
// Single-subscriber: FailFast would fault the session — must not happen here.
Volatile.Write(ref sessionFaultWouldBeCalled, true);
}
});
},
singleSubscriberMode: false);
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
@@ -520,6 +540,165 @@ public sealed class SessionEventDistributorTests
"isInternal must be true for a subscriber registered with isInternal: true.");
}
[Fact]
public async Task SingleSubscriberMode_LoneExternalOverflow_HandlerSeesIsOnlySubscriberTrue()
{
// Task 8 mode-gating: in single-subscriber mode a lone external subscriber that
// overflows reports isOnlySubscriber==true, so the legacy FailFast session-fault path
// is preserved. The decision is gated on the fixed session mode, NOT a live count, so
// it is race-free.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
int observedSet = 0;
bool observedValue = false;
await using SessionEventDistributor distributor = new(
"session-single-sub",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
(isOnlySubscriber, _) =>
{
Volatile.Write(ref observedValue, isOnlySubscriber);
Volatile.Write(ref observedSet, 1);
},
singleSubscriberMode: true);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease external = distributor.Register();
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
}
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(external.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
await Task.Run(async () =>
{
using CancellationTokenSource cts = new(ReadTimeout);
while (Volatile.Read(ref observedSet) == 0)
{
await Task.Delay(10, cts.Token);
}
});
// Guard: ensure the handler actually fired before asserting its observed value.
// Without this the test could pass vacuously if the overflow never triggered.
Assert.Equal(1, Volatile.Read(ref observedSet));
Assert.True(Volatile.Read(ref observedValue),
"isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode.");
}
[Fact]
public async Task RegisterWithReplay_WithinRetainedWindow_ReturnsNewerEvents_NoGap_ThenLive()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 10,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
// A primer subscriber forces the pump to retain events 1..5 deterministically.
using IEventSubscriberLease primer = distributor.Register();
for (ulong sequence = 1; sequence <= 5; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(primer.Reader);
}
// Resume after sequence 2: retained window [1..5] still covers it — no gap, replay 3..5.
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
2,
out IReadOnlyList<MxEvent> replay,
out bool gap,
out ulong oldestAvailable,
out ulong liveResume);
Assert.False(gap);
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
Assert.Equal(5ul, liveResume);
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
Assert.Equal(0ul, oldestAvailable);
// A subsequent live event flows to the resumed subscriber's channel.
source.Writer.TryWrite(Event(6));
MxEvent live = await ReadOneAsync(resume.Reader);
Assert.Equal(6ul, live.WorkerSequence);
}
[Fact]
public async Task RegisterWithReplay_BelowOldestRetained_ReportsGap_AndOldestAvailable()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 3,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease primer = distributor.Register();
for (ulong sequence = 1; sequence <= 5; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(primer.Reader);
}
// Capacity 3 retains 3,4,5; events 1,2 were evicted. Resume after 0 => gap, oldest=3.
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
0,
out IReadOnlyList<MxEvent> replay,
out bool gap,
out ulong oldestAvailable,
out ulong liveResume);
Assert.True(gap);
Assert.Equal(3ul, oldestAvailable);
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
Assert.Equal(5ul, liveResume);
}
[Fact]
public async Task RegisterWithReplay_NothingRetainedNewer_LiveResumeEqualsAfterSequence_NoGap()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 10,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease primer = distributor.Register();
for (ulong sequence = 1; sequence <= 3; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(primer.Reader);
}
// Resume after 3 (newest retained): nothing newer, fully caught up — no gap, empty
// replay, and the live filter resumes after the requested watermark unchanged.
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
3,
out IReadOnlyList<MxEvent> replay,
out bool gap,
out ulong oldestAvailable,
out ulong liveResume);
Assert.False(gap);
Assert.Empty(replay);
Assert.Equal(3ul, liveResume);
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
Assert.Equal(0ul, oldestAvailable);
source.Writer.TryWrite(Event(4));
MxEvent live = await ReadOneAsync(resume.Reader);
Assert.Equal(4ul, live.WorkerSequence);
}
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)
@@ -1,5 +1,7 @@
using System.Diagnostics;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Time.Testing;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Metrics;
@@ -365,6 +367,124 @@ public sealed class SessionManagerTests
Assert.Equal(0, workerClient.InvokeCount);
}
/// <summary>
/// With the opt-in worker-ready wait enabled, a worker that is transiently
/// <c>Handshaking</c> but flips to <c>Ready</c> within the timeout window must let the
/// command through rather than fail fast.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenWorkerHandshakingThenReadyWithinTimeout_Succeeds()
{
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(workerReadyWaitTimeoutMs: 500));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
// Flip the worker to Ready shortly after the invoke starts waiting.
_ = Task.Run(async () =>
{
await Task.Delay(50, CancellationToken.None);
workerClient.State = WorkerClientState.Ready;
});
WorkerCommandReply reply = await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None);
Assert.NotNull(reply);
Assert.Equal(1, workerClient.InvokeCount);
}
/// <summary>
/// A terminal worker state (<c>Faulted</c>) must fail fast even with a positive
/// worker-ready wait timeout, surfacing both states without burning the timeout.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenWorkerFaulted_FailsFastWithBothStates()
{
FakeWorkerClient workerClient = new() { State = WorkerClientState.Faulted };
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(workerReadyWaitTimeoutMs: 500));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Stopwatch stopwatch = Stopwatch.StartNew();
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None));
stopwatch.Stop();
Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
Assert.Contains("Session state is Ready", exception.Message);
Assert.Contains("worker state is Faulted", exception.Message);
Assert.Equal(0, workerClient.InvokeCount);
}
/// <summary>
/// When the worker stays transiently not-ready for the whole (small) timeout window,
/// the invoke fails after roughly the timeout with both states surfaced.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenTimeoutElapsesStillNotReady_FailsWithBothStates()
{
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(workerReadyWaitTimeoutMs: 100));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Stopwatch stopwatch = Stopwatch.StartNew();
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None));
stopwatch.Stop();
Assert.True(stopwatch.ElapsedMilliseconds >= 90, $"Expected the wait to span the timeout but took only {stopwatch.ElapsedMilliseconds}ms.");
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
Assert.Contains("Session state is Ready", exception.Message);
Assert.Contains("worker state is Handshaking", exception.Message);
Assert.Equal(0, workerClient.InvokeCount);
}
/// <summary>
/// Pins the default (timeout == 0) behavior: a transiently <c>Handshaking</c> worker
/// fails fast immediately, byte-for-byte like the original fail-fast path.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenTimeoutZero_FailsFastUnchanged()
{
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(workerReadyWaitTimeoutMs: 0));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Stopwatch stopwatch = Stopwatch.StartNew();
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None));
stopwatch.Stop();
Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
Assert.Contains("Session state is Ready", exception.Message);
Assert.Contains("worker state is Handshaking", exception.Message);
Assert.Equal(0, workerClient.InvokeCount);
}
/// <summary>Verifies that closing a session removes it from the registry.</summary>
[Fact]
public async Task CloseSessionAsync_RemovesClosedSession()
@@ -743,7 +863,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
session.ExtendLease(now.AddSeconds(-1));
using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false);
using IDisposable eventSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);
@@ -752,6 +872,83 @@ public sealed class SessionManagerTests
Assert.Equal(0, workerClient.ShutdownCount);
}
/// <summary>
/// Task 11. With detach-grace enabled, a session whose last external subscriber dropped
/// and whose detach-grace window has elapsed is closed by the lease sweep exactly like an
/// expired-lease session — even though its normal lease is still far in the future.
/// </summary>
[Fact]
public async Task CloseExpiredLeasesAsync_ClosesSessionWhoseDetachGraceWindowExpired()
{
FakeWorkerClient workerClient = new();
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(defaultLeaseSeconds: 1800, detachGraceSeconds: 30),
timeProvider: clock);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
// Attach and drop an external subscriber to enter detach-grace. The normal lease is
// still 30 minutes out, so only the detach-grace window can close this session.
IDisposable subscriber = session.AttachEventSubscriber(maxSubscribers: 1);
subscriber.Dispose();
Assert.NotNull(session.DetachedAtUtc);
// Before the window elapses: not closed.
clock.Advance(TimeSpan.FromSeconds(29));
int closedBefore = await manager.CloseExpiredLeasesAsync(clock.GetUtcNow(), CancellationToken.None);
Assert.Equal(0, closedBefore);
Assert.Equal(SessionState.Ready, session.State);
// After the window elapses: the sweep closes it.
clock.Advance(TimeSpan.FromSeconds(1));
int closedAfter = await manager.CloseExpiredLeasesAsync(clock.GetUtcNow(), CancellationToken.None);
Assert.Equal(1, closedAfter);
Assert.Equal(SessionState.Closed, session.State);
Assert.Equal(1, workerClient.ShutdownCount);
}
/// <summary>
/// Task 11. TOCTOU race: a session whose detach-grace window has expired but that
/// reattaches an external subscriber before the sweeper calls CloseSessionCoreAsync is
/// NOT closed — it remains Ready and usable. This validates that TryBeginCloseIfExpired
/// re-checks eligibility atomically so a reconnect that wins the race cancels the close.
/// </summary>
[Fact]
public async Task CloseExpiredLeasesAsync_DoesNotCloseSessionThatReattachedBeforeSweepCloses()
{
FakeWorkerClient workerClient = new();
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
options: CreateOptions(defaultLeaseSeconds: 1800, detachGraceSeconds: 30),
timeProvider: clock);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
// Attach and drop an external subscriber so the session enters detach-grace.
IDisposable firstSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
firstSubscriber.Dispose();
Assert.NotNull(session.DetachedAtUtc);
// Advance past the grace window so IsDetachGraceExpired returns true.
clock.Advance(TimeSpan.FromSeconds(31));
DateTimeOffset sweepTime = clock.GetUtcNow();
// Simulate a client reattaching before the sweep actually closes the session.
// The reattach clears _detachedAtUtc and increments _activeEventSubscriberCount,
// so TryBeginCloseIfExpired will see neither condition as met and decline.
using IDisposable reconnectedSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
Assert.Null(session.DetachedAtUtc);
// The sweep runs with the timestamp that was past the grace window, but since the
// subscriber has reattached, the session must NOT be closed.
int closedCount = await manager.CloseExpiredLeasesAsync(sweepTime, CancellationToken.None);
Assert.Equal(0, closedCount);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(0, workerClient.ShutdownCount);
}
/// <summary>Verifies that shutdown closes all registered sessions.</summary>
[Fact]
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
@@ -797,7 +994,9 @@ public sealed class SessionManagerTests
private static GatewayOptions CreateOptions(
int maxSessions = 64,
int defaultLeaseSeconds = 1800)
int defaultLeaseSeconds = 1800,
int detachGraceSeconds = 0,
int workerReadyWaitTimeoutMs = 0)
{
return new GatewayOptions
{
@@ -806,6 +1005,8 @@ public sealed class SessionManagerTests
DefaultCommandTimeoutSeconds = 30,
MaxSessions = maxSessions,
DefaultLeaseSeconds = defaultLeaseSeconds,
DetachGraceSeconds = detachGraceSeconds,
WorkerReadyWaitTimeoutMs = workerReadyWaitTimeoutMs,
},
Worker = new WorkerOptions
{
@@ -7,6 +7,7 @@ using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
@@ -330,62 +331,4 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests : IAsyncDisposable
DateTimeOffset.UtcNow);
}
/// <summary>
/// Fake worker process for testing process lifecycle. <see cref="WaitForExitAsync"/>
/// awaits a <see cref="TaskCompletionSource"/> completed only by
/// <see cref="Kill"/> or <see cref="MarkExited"/>, so a caller observing
/// completion can trust that exit actually happened — bringing this fake into
/// parity with the smoke-test variant in <c>GatewayEndToEndFakeWorkerSmokeTests</c>
/// (Tests-015 / Tests-023). This removes the latent regression vector where a
/// future <c>Assert.True(launcher.Process.HasExited)</c> in this file would
/// pass spuriously regardless of whether the worker truly exited.
/// </summary>
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
private bool _disposed;
/// <inheritdoc />
public int Id { get; } = processId;
/// <summary>Gets a value indicating whether the process has exited.</summary>
public bool HasExited { get; private set; }
/// <summary>Gets the process exit code, or null if the process has not exited.</summary>
public int? ExitCode { get; private set; }
/// <summary>Gets the number of times the Kill method was called.</summary>
public int KillCount { get; private set; }
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
}
/// <inheritdoc />
public void Kill(bool entireProcessTree)
{
KillCount++;
MarkExited(-1);
}
/// <inheritdoc />
public void Dispose()
{
_disposed = true;
}
/// <summary>Gets a value indicating whether this process has been disposed.</summary>
public bool IsDisposed => _disposed;
/// <summary>Marks the process as exited with the specified exit code.</summary>
/// <param name="exitCode">The process exit code.</param>
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
}
@@ -158,7 +158,7 @@ public sealed class WorkerClientTests
public async Task ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess()
{
await using PipePair pipePair = await PipePair.CreateAsync();
FakeWorkerProcess process = new();
FakeWorkerProcess process = new(WorkerProcessId);
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
@@ -309,7 +309,7 @@ public sealed class WorkerClientTests
public async Task DisposeAsync_WhenOwnedWorkerStillRuns_KillsProcessBeforeDisposing()
{
await using PipePair pipePair = await PipePair.CreateAsync();
FakeWorkerProcess process = new();
FakeWorkerProcess process = new(WorkerProcessId);
WorkerClient client = CreateClient(pipePair, processHandle: CreateProcessHandle(process));
await client.DisposeAsync().AsTask().WaitAsync(TestTimeout);
@@ -764,49 +764,4 @@ public sealed class WorkerClientTests
}
}
private sealed class FakeWorkerProcess : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>Gets the process ID.</summary>
public int Id { get; } = WorkerProcessId;
/// <summary>Gets a value indicating whether the process has exited.</summary>
public bool HasExited { get; private set; }
/// <summary>Gets the process exit code.</summary>
public int? ExitCode { get; private set; }
/// <summary>Gets the number of times kill was called.</summary>
public int KillCount { get; private set; }
/// <summary>Gets the last kill request's entire process tree flag.</summary>
public bool KillEntireProcessTree { get; private set; }
/// <summary>Gets a value indicating whether dispose was called.</summary>
public bool Disposed { get; private set; }
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
}
/// <summary>Records a kill request.</summary>
/// <param name="entireProcessTree">Whether to kill the entire process tree.</param>
public void Kill(bool entireProcessTree)
{
KillCount++;
KillEntireProcessTree = entireProcessTree;
HasExited = true;
ExitCode = -1;
_exited.TrySetResult();
}
/// <inheritdoc />
public void Dispose()
{
Disposed = true;
}
}
}
@@ -4,6 +4,7 @@ using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
@@ -68,6 +69,7 @@ public sealed class WorkerProcessLauncherTests
Assert.Equal(WorkerProcessLaunchErrorCode.StartupFailed, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.KillEntireProcessTree);
Assert.True(process.DisposeCalled);
Assert.True(pipeReservation.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
@@ -121,6 +123,7 @@ public sealed class WorkerProcessLauncherTests
Assert.Equal(WorkerProcessLaunchErrorCode.StartupTimeout, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.KillEntireProcessTree);
Assert.True(process.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
@@ -240,45 +243,6 @@ public sealed class WorkerProcessLauncherTests
}
}
/// <summary>Fake worker process for testing process lifecycle and exit behavior.</summary>
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
/// <inheritdoc />
public int Id { get; } = processId;
/// <summary>Gets or sets a value indicating whether the process has exited.</summary>
public bool HasExited { get; set; }
/// <summary>Gets or sets the process exit code.</summary>
public int? ExitCode { get; set; }
/// <summary>Gets a value indicating whether the Dispose method was called.</summary>
public bool DisposeCalled { get; private set; }
/// <summary>Gets a value indicating whether the Kill method was called.</summary>
public bool KillCalled { get; private set; }
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return ValueTask.CompletedTask;
}
/// <inheritdoc />
public void Kill(bool entireProcessTree)
{
Assert.True(entireProcessTree);
KillCalled = true;
HasExited = true;
}
/// <inheritdoc />
public void Dispose()
{
DisposeCalled = true;
}
}
/// <summary>Fake startup probe that immediately succeeds.</summary>
private sealed class SucceedingStartupProbe : IWorkerStartupProbe
{
@@ -0,0 +1,79 @@
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
/// <summary>
/// Lightweight in-process stand-in for <see cref="IWorkerProcess"/> used by fake worker
/// launchers and lifecycle tests.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="WaitForExitAsync"/> awaits a <see cref="TaskCompletionSource"/> that is
/// completed only by <see cref="Kill"/> or <see cref="MarkExited"/>, so a caller observing
/// completion can trust that exit actually happened rather than passing spuriously.
/// </para>
/// <para>
/// The disposal and kill bookkeeping is exposed under several aliases
/// (<see cref="IsDisposed"/>/<see cref="DisposeCalled"/>/<see cref="Disposed"/>;
/// <see cref="KillCount"/>/<see cref="KillCalled"/>) so the various lifecycle tests can
/// keep their existing assertion vocabulary while sharing one definition.
/// </para>
/// </remarks>
public sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>Gets the process identifier.</summary>
public int Id { get; } = processId;
/// <summary>Gets or sets a value indicating whether the process has exited.</summary>
public bool HasExited { get; set; }
/// <summary>Gets or sets the exit code of the process, or <see langword="null"/> if it has not exited.</summary>
public int? ExitCode { get; set; }
/// <summary>Gets the number of times <see cref="Kill"/> was called.</summary>
public int KillCount { get; private set; }
/// <summary>Gets a value indicating whether <see cref="Kill"/> was called at least once.</summary>
public bool KillCalled => KillCount > 0;
/// <summary>Gets the <c>entireProcessTree</c> flag from the most recent <see cref="Kill"/> call.</summary>
public bool KillEntireProcessTree { get; private set; }
/// <summary>Gets a value indicating whether <see cref="Dispose"/> was called.</summary>
public bool IsDisposed { get; private set; }
/// <summary>Gets a value indicating whether <see cref="Dispose"/> was called.</summary>
public bool DisposeCalled => IsDisposed;
/// <summary>Gets a value indicating whether <see cref="Dispose"/> was called.</summary>
public bool Disposed => IsDisposed;
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
new(_exited.Task.WaitAsync(cancellationToken));
/// <inheritdoc />
public void Kill(bool entireProcessTree)
{
KillCount++;
KillEntireProcessTree = entireProcessTree;
MarkExited(-1);
}
/// <inheritdoc />
public void Dispose() => IsDisposed = true;
/// <summary>
/// Marks the process as exited with the specified exit code and unblocks
/// any callers of <see cref="WaitForExitAsync"/>.
/// </summary>
/// <param name="exitCode">The process exit code.</param>
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
@@ -4,7 +4,7 @@ namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
/// <summary>
/// Thread-safe <see cref="IServerStreamWriter{T}"/> that records every written message
/// and lets a test await the first message with a timeout.
/// and lets a test await the first message or a specific message count with a timeout.
/// </summary>
/// <typeparam name="T">The streamed message type.</typeparam>
public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
@@ -12,6 +12,7 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
private readonly object _syncRoot = new();
private readonly TaskCompletionSource<T> _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> _messages = [];
private readonly List<TaskCompletionSource<IReadOnlyList<T>>> _countWaiters = [];
/// <summary>Gets the messages written to this stream, in order.</summary>
public IReadOnlyList<T> Messages
@@ -33,12 +34,31 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
/// <returns>A completed task.</returns>
public Task WriteAsync(T message)
{
List<TaskCompletionSource<IReadOnlyList<T>>>? satisfied = null;
IReadOnlyList<T>? snapshot = null;
lock (_syncRoot)
{
_messages.Add(message);
_firstMessage.TrySetResult(message);
// Check whether any count waiters are now satisfied.
if (_countWaiters.Count > 0)
{
snapshot = _messages.ToArray();
satisfied = _countWaiters.ToList();
_countWaiters.Clear();
}
}
if (satisfied is not null && snapshot is not null)
{
foreach (TaskCompletionSource<IReadOnlyList<T>> waiter in satisfied)
{
waiter.TrySetResult(snapshot);
}
}
_firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
@@ -47,4 +67,60 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
/// <returns>The first message written to this stream.</returns>
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout) =>
await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
/// <summary>
/// Waits until at least <paramref name="count"/> messages have been written, then returns
/// the current snapshot. The wait is bounded by a single deadline of <c>now + timeout</c>;
/// intermediate wakeups (when a message arrives but the count is not yet met) consume from
/// that same deadline so the total elapsed time never exceeds <paramref name="timeout"/>.
/// If fewer than <paramref name="count"/> messages arrive before the deadline the call
/// throws <see cref="OperationCanceledException"/>.
/// </summary>
/// <param name="count">Minimum number of messages to wait for.</param>
/// <param name="timeout">Maximum total time to wait, measured from the moment of the call.</param>
/// <returns>A snapshot of all messages received so far (at least <paramref name="count"/>).</returns>
public async Task<IReadOnlyList<T>> WaitForMessageCountAsync(int count, TimeSpan timeout)
{
// Capture a single deadline so every iteration of the loop below draws from the
// same budget — using WaitAsync(timeout) per iteration would reset the clock on
// each intermediate wakeup, effectively giving N×timeout total budget.
using CancellationTokenSource deadlineCts = new(timeout);
CancellationToken deadlineToken = deadlineCts.Token;
TaskCompletionSource<IReadOnlyList<T>>? tcs = null;
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource<IReadOnlyList<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
// Re-check each time any message arrives. The TCS is satisfied on every write,
// but the caller may need more messages, so we loop until the count is met.
while (true)
{
IReadOnlyList<T> snapshot = await tcs.Task.WaitAsync(deadlineToken).ConfigureAwait(false);
if (snapshot.Count >= count)
{
return snapshot;
}
// Not enough yet — register a new waiter and keep waiting against the same deadline.
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource<IReadOnlyList<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
}
}
}
+14 -10
View File
@@ -3,6 +3,8 @@
**Generated:** 2026-06-15 · **Commit:** `c7f754c` (main) · **Method:** six parallel read-only audits (Server, Worker, Contracts/proto, all five clients, docs/design/plans, tests + review backlog). Every item cites a verified `file:line`.
> **Resolution update (2026-06-15, branch `feat/stillpending-completion`):** The actionable items were implemented and verified per `docs/plans/2026-06-15-stillpending-completion.md`. **§1.1** (all 11 worker command kinds), **§1.2** (audit CorrelationId), and the **§4** client CLI/helper parity gaps are **Resolved** — see per-item annotations below. Worker COM commands are live-verified on the dev rig (`efd9971`, `f7ada90`). Remaining open items are the documented residuals (**§1.3**, **§1.4**, the **§3** vendor/capture-gated questions incl. the new **§3.2** multi-sample buffered residual) and the deliberate v1 scope of **§2**. Zero `.proto` changes were needed (all reply messages already existed).
>
> **Resolution update (2026-06-16, `main`):** The **session-resilience epic** (`docs/plans/2026-06-15-session-resilience.md`) landed its first **12 of 28 tasks** on `main` (through merge `c446bef`), which moves several **§2** items off "deferred": **multi-subscriber fan-out is now Resolved**, and **reconnectable sessions** has its server-side core (detach-grace window + replay-on-reconnect + a new `ReplayGap` signal on `MxEvent`). Still pending in that epic: reconnect Tasks 1315 (owner re-validation, client `ReplayGap` handling, integration test), **Phase 4** per-session dashboard ACL (Tasks 1619 — the §2/§7.6/§8 EventsHub-ACL item), and **Phase 5** orphan-worker reattach (Tasks 2028). Separately, a `MxGateway:Dashboard:DisableLogin` dev flag shipped (`ca443b1`) — auto-authenticates the dashboard as a multi-role admin; default off, **enabled on the 10.100.0.48 deployment**. Per-item §2 status annotated below; remaining epic tasks tracked in `oldtasks.md`.
## How to read this
@@ -49,9 +51,9 @@ Items are graded by what they actually are, because most "pending" surface in th
These are documented, deliberate, and mostly enforced. Listed so the deferred surface is in one place — **none are bugs.** Canonical register: `docs/DesignDecisions.md:466-474` ("Later Revisit Items") + `gateway.md` "Post-v1 revisit items".
- 🔵 **Reconnectable sessions** — not in v1. `docs/DesignDecisions.md:63-73`, `gateway.md:1087,1101`.
- 🔵 **Multi-event-subscriber fan-out***plumbed but blocked.* The option flows all the way to `Sessions/GatewaySession.cs:387-408 AttachEventSubscriber(allowMultipleSubscribers)`, but `Configuration/GatewayOptionsValidator.cs:181-185` hard-rejects the only enabling value: *"AllowMultipleEventSubscribers is not supported until event fan-out is implemented."* So the fan-out code path never runs. `docs/DesignDecisions.md:75-80`.
- 🔵 **Gateway restart does not reattach orphan workers** — terminates them on startup. `docs/DesignDecisions.md:65-69`, `CLAUDE.md`.
- 🟡 **Reconnectable sessions — server-side core landed, not yet complete (2026-06-16, `main`).** Epic Phase 3 added the detach-grace retention window, replay-on-reconnect from the bounded replay ring, and the `ReplayGap` signal on `MxEvent` (Tasks 1012, merged). Still pending: owner re-validation on reconnect (Task 13), client `ReplayGap` handling across all five clients (Task 14), and the fake-worker reconnect integration test (Task 15) — so reconnect is **not** safe to rely on end-to-end yet. Overturns `docs/DesignDecisions.md:63-73`. See `docs/plans/2026-06-15-session-resilience.md` Phase 3.
- **Multi-event-subscriber fan-out — RESOLVED (2026-06-16, `main`).** Epic Phase 2 (Tasks 79) removed the `GatewayOptionsValidator` rejection, added a `MaxEventSubscribersPerSession` cap (default 8), and built fan-out on the new `SessionEventDistributor` (single pump → N bounded per-subscriber channels with per-subscriber backpressure), with FakeWorkerHarness end-to-end coverage. Was: validator-blocked. `docs/plans/2026-06-15-session-resilience.md` Phase 2.
- 🟡 **Gateway restart does not reattach orphan workers — still true on `main`, planned (epic Phase 5, not started).** Workers are still terminated on startup. Epic Phase 5 (Tasks 2028) is designed to reverse this — stable pipe naming, a SQLite adoption manifest, a worker adopt/reconnect proto frame, and nonce-validated re-adoption behind an `EnableOrphanReattach` flag (default off). Reverses the hard rule in `docs/DesignDecisions.md:65-69` and `CLAUDE.md`. `docs/plans/2026-06-15-session-resilience.md` Phase 5.
- 🔵 **Workers run as the gateway service identity** — restricted service account is a reserved extension point. `docs/DesignDecisions.md:179-184`.
- 🔵 **Fail-fast event backpressure, no coalescing** — opt-in coalescing is post-v1. `docs/DesignDecisions.md:187-203`.
- 🔵 **No public command batching**`docs/DesignDecisions.md:206-212`.
@@ -60,7 +62,7 @@ These are documented, deliberate, and mostly enforced. Listed so the deferred su
- 🔵 **Lazy browse is wire-only** — no lazy SQL / cache loading. `docs/DesignDecisions.md:365-376`, `docs/plans/2026-05-28-lazy-browse-design.md:30`.
- 🔵 **No server-side / streaming browse search**`docs/plans/2026-05-28-lazy-browse-design.md:208`.
- 🔵 **Alarm command surface is ack + query only** — no Clear/Disable/Enable/Silence/Shelve/Inhibit; matches the MXAccess alarm-client set. `Worker/MxAccess/AlarmCommandHandler.cs`, shelve/suppress out of scope per `docs/AlarmClientDiscovery.md:60-66`.
- 🔵 **Dashboard EventsHub has no per-session ACL** — any authenticated dashboard user may subscribe to any session group. `Dashboard/Hubs/EventsHub.cs:36-50` (`TODO(per-session-acl)`); only relevant once a per-session role model exists.
- 🟡 **Dashboard EventsHub has no per-session ACL — still true on `main`, planned (epic Phase 4, not started).** Any authenticated dashboard user may still subscribe to any session group (`Dashboard/Hubs/EventsHub.cs` `TODO(per-session-acl)`). The enabling foundation (session `OwnerKeyId`) already merged in epic Phase 1; epic Phase 4 (Tasks 1619) adds the gRPC session-owner gate, a session tag + group-to-tag config, and EventsHub per-session ACL with a hub-token tag claim. `docs/plans/2026-06-15-session-resilience.md` Phase 4. (See also §8.)
---
@@ -121,7 +123,7 @@ No placeholder/empty/`Assert.True(true)` tests were found anywhere.
## 6. Config-gated functional gaps (work only after configuration)
- 🟠 **6.1 Alarm ack in subtag mode requires `AckComment` subtag configured** — empty by default; ack fails in subtag mode until set. Names must be validated against live MXAccess, not guessed. `docs/DesignDecisions.md:454-458`. (`AckCommentSubtag` is write-only; `Worker/MxAccess/SubtagAlarmStateMachine.cs:21`.)
- 🔵 **6.2 Multi-subscriber** — see 2 (option exists, validator-blocked).
- **6.2 Multi-subscriber — RESOLVED** (validator block removed; fan-out implemented — see §2).
---
@@ -141,10 +143,12 @@ No placeholder/empty/`Assert.True(true)` tests were found anywhere.
## 8. Deferred test-coverage follow-ups (noted in resolutions, never filed as findings)
- **Java CLI bulk-subcommand coverage** — 6 of 13 non-trivial subcommands untested: `read-bulk`, `write-bulk`, `write2-bulk`, `write-secured-bulk`, `write-secured2-bulk`, `bench-read-bulk` (plus `stream-events`, the four `galaxy-*`, `close-session`). `code-reviews/Client.Java/findings.md:495` (Client.Java-026).
- **Per-session-ACL TODO** at `Server/Dashboard/Hubs/EventsHub.cs` (`code-reviews/Server/findings.md:765`).
- **Worker-Ready retry race** noted at `code-reviews/Server/findings.md:611`.
- **Duplicated `FakeWorkerProcess` harness** flagged as a latent regression vector — `code-reviews/Tests/findings.md:463`.
> **Resolution update (2026-06-16, branch `feat/stillpending-section8`):** Three of the four items below are **Resolved** per `docs/plans/2026-06-16-stillpending-section8.md` (Java CLI coverage, Worker-Ready wait, duplicated `FakeWorkerProcess`). Java work verified on the windev toolchain (full cli suite green); C# verified locally. The remaining item (per-session-ACL TODO) was correctly out of scope — it is session-resilience epic Phase 4.
- **Java CLI bulk-subcommand coverage — RESOLVED.** All 10 previously-untested subcommands now have CLI-layer round-trip tests in `clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java`: the bulk family (`read-bulk`, `write-bulk`, `write2-bulk`, `write-secured-bulk`, `write-secured2-bulk`, `bench-read-bulk`) + `close-session` via the `FakeSession` seam; `stream-events`, `galaxy-discover`, `galaxy-watch` via a new in-process gRPC harness (`InProcessGatewayHarness`) that drives the real client over an in-process channel. A `GalaxyClientFactory` seam was added to `MxGatewayCli` (behavior-preserving default) to make the galaxy commands testable. Was: `code-reviews/Client.Java/findings.md:495` (Client.Java-026).
- 🟡 **Per-session-ACL TODO** at `Server/Dashboard/Hubs/EventsHub.cs` (`code-reviews/Server/findings.md:765`) — still scheduled as session-resilience epic Phase 4 (Tasks 1619); not yet started. (Out of scope for the §8 completion branch.)
- ✅ **Worker-Ready retry race — RESOLVED.** `GatewaySession.GetReadyWorkerClient` is now an opt-in bounded ready-wait (`GetReadyWorkerClientAsync` + `MxGateway:Sessions:WorkerReadyWaitTimeoutMs`, **default `0` = unchanged fail-fast**): a session that is `Ready` while its worker is transiently `Handshaking`/`Created` waits up to the configured window for `Ready`; terminal worker states fail fast immediately with the both-states diagnostic preserved. Was: `code-reviews/Server/findings.md:611`.
- ✅ **Duplicated `FakeWorkerProcess` harness — RESOLVED.** The three private nested `FakeWorkerProcess` copies were consolidated onto the canonical `src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs` (one definition remains). Was a latent regression vector — `code-reviews/Tests/findings.md:463`.
---
@@ -157,6 +161,6 @@ No placeholder/empty/`Assert.True(true)` tests were found anywhere.
- **§1.4 / §3.4 / §3.5** — the AVEVA 8-arg `AlarmAckByName` is a vendor stub (55) and `AlarmAckByGUID` is `E_NOTIMPL`; the `domain`/`full_name` fields stay forward-compat-only until AVEVA implements them.
- **§3.2** — buffered commands work and the empty bootstrap converts cleanly live, but a multi-sample buffered batch is undrivable on the rig (unit-tested only).
- **§3.1 / §3.3 / §3.6 / §3.7** — await live MXAccess captures.
- **§2** — deliberate v1 scope. **§5** — opt-in verification gates. **§7.6** — accepted `Won't Fix` review findings.
- **§2** — mostly deliberate v1 scope, but the session-resilience epic (12/28 tasks merged to `main`) has since **resolved multi-subscriber fan-out** and landed the **reconnect server-side core**; reconnect Tasks 1315, per-session ACL (Phase 4), and orphan-worker reattach (Phase 5) remain (see `docs/plans/2026-06-15-session-resilience.md`, `oldtasks.md`). **§5** — opt-in verification gates. **§7.6** — accepted `Won't Fix` review findings.
MXAccess **event/data/value/write** mapping, the **Galaxy** RPC surface, and now the **full command surface** are complete; no `NotImplementedException`s, stubbed RPC bodies, or empty tests remain in the production paths.