Compare commits

...

5 Commits

Author SHA1 Message Date
Joseph Doherty 6a4833bd32 Regenerate code-reviews index after Medium findings Batch B
Reflects resolution of Tests-003..006, Worker.Tests-003..007,
IntegrationTests-003..006, Client.Python-003/005/009.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:45:29 -04:00
Joseph Doherty e4fbbb541a Resolve Client.Python-003, -005, -009 code-review findings
Client.Python-003: stream_events_raw and query_active_alarms passed `timeout`
to the stub with no TypeError fallback, unlike _unary. Both now route through
a shared _open_stream helper that strips `timeout` on TypeError.

Client.Python-005: discover_hierarchy buffered the entire Galaxy hierarchy in
memory. Added GalaxyRepositoryClient.iter_hierarchy, a lazy async generator
yielding objects page-by-page; discover_hierarchy is now a thin wrapper that
preserves its list contract. README documents iter_hierarchy.

Client.Python-009: added regression coverage for previously untested paths —
write2/add_item2 request shape, the MAX_BULK_ITEMS boundary, the None-argument
TypeError guards, TLS ca_file reading, and the non-auth map_rpc_error fallthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:45:16 -04:00
Joseph Doherty f13f35bc79 Resolve IntegrationTests-003..006 code-review findings
IntegrationTests-003: the live MXAccess smoke test asserted on the first
streamed event, which a registration/quality bootstrap event could occupy.
The recording writer now waits for the first event matching a predicate
(Family == OnDataChange).

IntegrationTests-004: the cleanup `finally` could throw and mask an original
assertion failure. Shutdown now routes through a helper that logs cleanup
exceptions instead of propagating them.

IntegrationTests-005: added live MXAccess parity tests — a Write round-trip
to an advised item, and an invalid-handle command surfacing the MXAccess
failure without a transport fault.

IntegrationTests-006: added live LDAP failure-path tests — wrong password
(no password leak), unknown username, and server-unreachable.

docs/GatewayTesting.md updated to describe the new cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:45:11 -04:00
Joseph Doherty 18ce2922e2 Resolve Worker.Tests-003..007 code-review findings
Worker.Tests-003: removed the wall-clock `Elapsed < 2s` assertion from
InvokeAsync_WakesIdlePumpForQueuedCommand; the awaited completion against a
30s idle period already proves the wake event drove dispatch.

Worker.Tests-004: MxAccessStaSession.Dispose now joins the alarm poll task
after cancelling the CTS (consistent with ShutdownGracefullyAsync), and
Dispose_StopsAlarmPollLoop asserts deterministically instead of via Task.Delay.

Worker.Tests-005: undisposed MemoryStream instances across the frame-protocol
and pipe-session tests are now `using` declarations.

Worker.Tests-006: Dispose_StopsAlarmPollLoop now constructs MxAccessStaSession
with `using` so a failed assertion cannot leak the STA poll loop.

Worker.Tests-007: docs/WorkerFrameProtocol.md verification section corrected
to target MxGateway.Worker.Tests / MxGateway.Worker with -p:Platform=x86.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:45:01 -04:00
Joseph Doherty 5ade3f4f48 Resolve Tests-003, -004, -005, -006 code-review findings
Tests-003: temp auth-DB directories leaked under %TEMP%. Added the
TempDatabaseDirectory IDisposable helper (clears the Sqlite connection pool,
then recursively deletes); SqliteAuthStoreTests and ApiKeyAdminCliRunnerTests
now dispose every directory they create.

Tests-004: added end-to-end coverage composing the real authorization
interceptor in front of the real MxAccessGatewayService, plus scope-resolver
tests confirming an unmapped request type fails closed to the admin scope.

Tests-005: added coverage for a worker faulting mid-command — a pipe
disconnect and a worker fault while an InvokeAsync is in flight both fail the
pending invoke. No product change needed.

Tests-006 (re-triaged): the flaky ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess
is a test race, not a product bug — the kill runs synchronously inside
SetFaulted. Rewrote it to await FakeWorkerProcess exit deterministically, and
replaced fixed Task.Delay timing in the late-reply and heartbeat tests with
FIFO ordering and an injected ManualTimeProvider.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:44:55 -04:00
27 changed files with 1591 additions and 144 deletions
+19
View File
@@ -131,6 +131,25 @@ The methods return native Python types (`bool`, `datetime | None`, and a
into the hierarchy without learning the underlying stub class. The
service requires the `metadata:read` scope on the API key.
`discover_hierarchy` buffers every object (with its full attribute list)
into a single in-memory `list`. For a large Galaxy use `iter_hierarchy`
instead — it is an async generator that fetches one page at a time and
yields objects as they arrive, so peak memory stays bounded by a single
page rather than the whole hierarchy:
```python
async with await GalaxyRepositoryClient.connect(
endpoint="localhost:5000",
api_key="<gateway-api-key>",
plaintext=True,
) as galaxy:
async for obj in galaxy.iter_hierarchy():
print(obj.tag_name, obj.contained_name)
```
Pages are fetched lazily: the next page is only requested once the
caller has consumed every object from the current page.
### Watching deploy events
`GalaxyRepositoryClient.watch_deploy_events` opens a server-streaming
+19 -2
View File
@@ -133,7 +133,7 @@ class GatewayClient:
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
if self.options.stream_timeout is not None:
kwargs["timeout"] = self.options.stream_timeout
call = self.raw_stub.StreamEvents(request, **kwargs)
call = _open_stream(self.raw_stub.StreamEvents, request, kwargs)
return _canceling_iterator(call)
async def acknowledge_alarm(
@@ -169,7 +169,7 @@ class GatewayClient:
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
if self.options.stream_timeout is not None:
kwargs["timeout"] = self.options.stream_timeout
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
call = _open_stream(self.raw_stub.QueryActiveAlarms, request, kwargs)
return _canceling_active_alarms_iterator(call)
async def _unary(
@@ -201,6 +201,23 @@ class GatewayClient:
raise map_rpc_error(operation, error) from error
def _open_stream(method: Any, request: Any, kwargs: dict[str, Any]) -> Any:
"""Open a server-streaming call, dropping ``timeout`` if the stub rejects it.
Mirrors the fallback in ``_unary`` so an older or fake stub that does not
accept a ``timeout`` keyword argument does not crash when ``stream_timeout``
is configured.
"""
try:
return method(request, **kwargs)
except TypeError as error:
if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error):
raise
kwargs.pop("timeout")
return method(request, **kwargs)
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
try:
async for event in call:
+23 -5
View File
@@ -114,10 +114,17 @@ class GalaxyRepositoryClient:
return None
return reply.time_of_last_deploy.ToDatetime()
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
async def iter_hierarchy(self) -> AsyncIterator[galaxy_pb.GalaxyObject]:
"""Yield the deployed Galaxy object hierarchy one object at a time.
Pages are fetched lazily: a page is only requested once the caller has
consumed every object from the previous page. This keeps peak memory
bounded by a single page (``_DISCOVER_HIERARCHY_PAGE_SIZE`` objects)
rather than the whole Galaxy. Use this for large Galaxies; use
:meth:`discover_hierarchy` when a fully buffered ``list`` is convenient
and the Galaxy is known to be small.
"""
objects: list[galaxy_pb.GalaxyObject] = []
seen_page_tokens: set[str] = set()
page_token = ""
while True:
@@ -129,16 +136,27 @@ class GalaxyRepositoryClient:
page_token=page_token,
),
)
objects.extend(reply.objects)
for obj in reply.objects:
yield obj
page_token = reply.next_page_token
if not page_token:
return objects
return
if page_token in seen_page_tokens:
raise MxGatewayError(
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
)
seen_page_tokens.add(page_token)
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
"""Return the deployed Galaxy object hierarchy as raw proto messages.
This buffers every object (and its full attribute list) into a single
in-memory ``list``. For a large Galaxy prefer :meth:`iter_hierarchy`,
which streams objects page by page without holding the whole hierarchy.
"""
return [obj async for obj in self.iter_hierarchy()]
def watch_deploy_events(
self,
last_seen_deploy_time: datetime | None = None,
+284
View File
@@ -0,0 +1,284 @@
"""Regression tests for Client.Python-009: untested public paths.
Covers `Session.write2`/`add_item2` request construction, the bulk-size limit
guard, the ``None``-argument ``TypeError`` guards, the TLS ``ca_file`` read
path in `create_channel`, the generic `map_rpc_error` fallthrough, and a
happy-path CLI command body driven by a fake stub.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
import grpc
import pytest
from click.testing import CliRunner
from mxgateway import ClientOptions, GatewayClient
from mxgateway.errors import MxGatewayTransportError, map_rpc_error
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.options import create_channel
from mxgateway.session import MAX_BULK_ITEMS, Session
class _FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class _FakeGatewayStub:
def __init__(self) -> None:
self.open_session = _FakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = _FakeUnary([])
self.OpenSession = self.open_session
self.Invoke = self.invoke
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
return pb.MxCommandReply(
session_id="session-1",
kind=kind,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
**fields,
)
# --- write2 / add_item2 request construction -------------------------------
@pytest.mark.asyncio
async def test_add_item2_sends_item_context_and_returns_handle() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [
_ok_reply(pb.MX_COMMAND_KIND_ADD_ITEM2, add_item2=pb.AddItem2Reply(item_handle=77)),
]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
item_handle = await session.add_item2(12, "Object.Attribute", "ctx-A")
assert item_handle == 77
command = stub.invoke.requests[0].command
assert command.kind == pb.MX_COMMAND_KIND_ADD_ITEM2
assert command.add_item2.server_handle == 12
assert command.add_item2.item_definition == "Object.Attribute"
assert command.add_item2.item_context == "ctx-A"
@pytest.mark.asyncio
async def test_write2_sends_value_and_timestamp_value() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [_ok_reply(pb.MX_COMMAND_KIND_WRITE2)]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
when = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
await session.write2(12, 34, 123, when, user_id=5)
command = stub.invoke.requests[0].command
assert command.kind == pb.MX_COMMAND_KIND_WRITE2
assert command.write2.server_handle == 12
assert command.write2.item_handle == 34
assert command.write2.user_id == 5
# The integer value is carried as the int32 field of the MxValue oneof.
assert command.write2.value.WhichOneof("kind") == "int32_value"
assert command.write2.value.int32_value == 123
# The timestamp value carries the datetime via the timestamp_value oneof.
assert command.write2.timestamp_value.WhichOneof("kind") == "timestamp_value"
assert command.write2.timestamp_value.timestamp_value.ToDatetime(
tzinfo=timezone.utc,
) == when
# --- bulk-size limit + None-argument guards --------------------------------
@pytest.mark.asyncio
async def test_subscribe_bulk_rejects_oversized_request() -> None:
stub = _FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
oversized = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS + 1)]
with pytest.raises(ValueError, match=str(MAX_BULK_ITEMS)):
await session.subscribe_bulk(12, oversized)
# No RPC should have been issued for a rejected request.
assert stub.invoke.requests == []
@pytest.mark.asyncio
async def test_advise_item_bulk_rejects_none_argument() -> None:
stub = _FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
with pytest.raises(TypeError, match="item_handles is required"):
await session.advise_item_bulk(12, None) # type: ignore[arg-type]
@pytest.mark.asyncio
async def test_add_item_bulk_at_limit_is_allowed() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [
_ok_reply(
pb.MX_COMMAND_KIND_ADD_ITEM_BULK,
add_item_bulk=pb.BulkSubscribeReply(results=[]),
),
]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
at_limit = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS)]
results = await session.add_item_bulk(12, at_limit)
assert results == []
assert len(stub.invoke.requests) == 1
assert len(stub.invoke.requests[0].command.add_item_bulk.tag_addresses) == MAX_BULK_ITEMS
# --- TLS ca_file read path -------------------------------------------------
@pytest.mark.asyncio
async def test_create_channel_reads_ca_file(tmp_path: Any) -> None:
ca_path = tmp_path / "ca.pem"
ca_path.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
channel = create_channel(
ClientOptions(
endpoint="mxgateway.example.local:5001",
ca_file=str(ca_path),
server_name_override="mxgateway.example.local",
),
)
# A secure channel object is returned without raising; the ca_file was read.
assert channel is not None
await channel.close()
def test_create_channel_missing_ca_file_raises() -> None:
with pytest.raises(FileNotFoundError):
create_channel(
ClientOptions(
endpoint="mxgateway.example.local:5001",
ca_file="C:/does/not/exist/ca.pem",
),
)
# --- map_rpc_error generic fallthrough -------------------------------------
class _FakeRpcError(grpc.RpcError):
def __init__(self, code: grpc.StatusCode, details: str) -> None:
self._code = code
self._details = details
def code(self) -> grpc.StatusCode:
return self._code
def details(self) -> str:
return self._details
def test_map_rpc_error_generic_branch_returns_transport_error() -> None:
error = _FakeRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused")
mapped = map_rpc_error("invoke", error)
assert type(mapped) is MxGatewayTransportError
assert "invoke failed: connection refused" in str(mapped)
def test_map_rpc_error_handles_error_without_code() -> None:
mapped = map_rpc_error("invoke", grpc.RpcError())
assert type(mapped) is MxGatewayTransportError
assert "invoke failed:" in str(mapped)
# --- happy-path CLI command body -------------------------------------------
def test_cli_register_happy_path_emits_server_handle(monkeypatch: Any) -> None:
"""Drive the `register` CLI command end to end against a fake stub."""
from mxgateway_cli import commands
invoke = _FakeUnary(
[
_ok_reply(
pb.MX_COMMAND_KIND_REGISTER,
register=pb.RegisterReply(server_handle=99),
),
],
)
class _Stub:
def __init__(self) -> None:
self.Invoke = invoke
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
return await GatewayClient.connect(
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
stub=_Stub(),
)
monkeypatch.setattr(commands, "_connect", _fake_connect)
runner = CliRunner()
result = runner.invoke(
commands.main,
[
"register",
"--endpoint",
"localhost:5000",
"--session-id",
"session-1",
"--client-name",
"pytest-client",
"--json",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {"serverHandle": 99}
assert invoke.requests[0].command.register.client_name == "pytest-client"
@@ -0,0 +1,127 @@
"""Regression tests for Client.Python-005: streaming hierarchy iteration.
`GalaxyRepositoryClient.iter_hierarchy` yields objects page by page instead of
buffering the entire Galaxy hierarchy in memory, and `discover_hierarchy`
remains a convenience wrapper built on top of it.
"""
from __future__ import annotations
from typing import Any
import pytest
from mxgateway import ClientOptions, GalaxyRepositoryClient
from mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
class _FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
timeout: float | None = None,
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class _FakeGalaxyStub:
def __init__(self, discover_replies: list[Any]) -> None:
self.DiscoverHierarchy = _FakeUnary(discover_replies)
def _two_page_replies() -> list[galaxy_pb.DiscoverHierarchyReply]:
return [
galaxy_pb.DiscoverHierarchyReply(
next_page_token="page-2",
total_object_count=3,
objects=[
galaxy_pb.GalaxyObject(gobject_id=1, tag_name="Area_001", is_area=True),
galaxy_pb.GalaxyObject(gobject_id=2, tag_name="Pump_001"),
],
),
galaxy_pb.DiscoverHierarchyReply(
total_object_count=3,
objects=[
galaxy_pb.GalaxyObject(gobject_id=3, tag_name="Pump_002"),
],
),
]
@pytest.mark.asyncio
async def test_iter_hierarchy_yields_objects_across_pages() -> None:
stub = _FakeGalaxyStub(_two_page_replies())
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
tags = [obj.tag_name async for obj in client.iter_hierarchy()]
assert tags == ["Area_001", "Pump_001", "Pump_002"]
assert len(stub.DiscoverHierarchy.requests) == 2
assert stub.DiscoverHierarchy.requests[0].page_token == ""
assert stub.DiscoverHierarchy.requests[1].page_token == "page-2"
@pytest.mark.asyncio
async def test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page() -> None:
"""Pulling only the first object must not have requested the second page."""
stub = _FakeGalaxyStub(_two_page_replies())
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
iterator = client.iter_hierarchy()
first = await iterator.__anext__()
assert first.tag_name == "Area_001"
# Only the first page should have been fetched so far.
assert len(stub.DiscoverHierarchy.requests) == 1
await iterator.aclose()
@pytest.mark.asyncio
async def test_iter_hierarchy_rejects_repeated_page_token() -> None:
stub = _FakeGalaxyStub(
[
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
],
)
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
with pytest.raises(Exception, match="repeated page token"):
async for _ in client.iter_hierarchy():
pass
@pytest.mark.asyncio
async def test_discover_hierarchy_still_returns_full_list() -> None:
"""The convenience wrapper must keep returning a buffered list."""
stub = _FakeGalaxyStub(_two_page_replies())
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
objects = await client.discover_hierarchy()
assert isinstance(objects, list)
assert [obj.tag_name for obj in objects] == ["Area_001", "Pump_001", "Pump_002"]
@@ -0,0 +1,132 @@
"""Regression tests for Client.Python-003: stream timeout-kwarg fallback.
`stream_events_raw` and `query_active_alarms` must tolerate a fake/older stub
that does not accept a ``timeout`` keyword argument, matching the fallback
already present in `galaxy.watch_deploy_events` and the unary `_unary` helper.
"""
from __future__ import annotations
from typing import Any
import pytest
from mxgateway import ClientOptions, GatewayClient
from mxgateway.generated import mxaccess_gateway_pb2 as pb
class _NoTimeoutStream:
"""Sync-callable unary-stream fake that rejects a ``timeout`` kwarg."""
def __init__(self, replies: list[Any]) -> None:
self._replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
self.cancelled = False
def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> "_NoTimeoutStream":
self.requests.append(request)
self.metadata = metadata
return self
def __aiter__(self) -> "_NoTimeoutStream":
return self
async def __anext__(self) -> Any:
if not self._replies:
raise StopAsyncIteration
return self._replies.pop(0)
def cancel(self) -> None:
self.cancelled = True
class _NoTimeoutStubStreamEvents:
def __init__(self, stream: _NoTimeoutStream) -> None:
self.StreamEvents = stream
class _NoTimeoutStubQueryAlarms:
def __init__(self, stream: _NoTimeoutStream) -> None:
self.QueryActiveAlarms = stream
@pytest.mark.asyncio
async def test_stream_events_raw_falls_back_when_stub_rejects_timeout() -> None:
stream = _NoTimeoutStream(
[pb.MxEvent(session_id="session-1", worker_sequence=1)],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
stub=_NoTimeoutStubStreamEvents(stream),
)
received = [
event
async for event in client.stream_events_raw(
pb.StreamEventsRequest(session_id="session-1"),
)
]
assert len(received) == 1
assert received[0].worker_sequence == 1
@pytest.mark.asyncio
async def test_query_active_alarms_falls_back_when_stub_rejects_timeout() -> None:
stream = _NoTimeoutStream(
[pb.ActiveAlarmSnapshot(alarm_full_reference="Tank01.Level.HiHi")],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
stub=_NoTimeoutStubQueryAlarms(stream),
)
received = [
snapshot
async for snapshot in client.query_active_alarms(
pb.QueryActiveAlarmsRequest(session_id="session-1"),
)
]
assert len(received) == 1
assert received[0].alarm_full_reference == "Tank01.Level.HiHi"
@pytest.mark.asyncio
async def test_stream_events_raw_still_passes_timeout_to_capable_stub() -> None:
"""A stub that accepts ``timeout`` must still receive the configured value."""
captured: dict[str, Any] = {}
class _CapableStream(_NoTimeoutStream):
def __call__( # type: ignore[override]
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
timeout: float | None = None,
) -> "_CapableStream":
captured["timeout"] = timeout
return super().__call__(request, metadata=metadata)
stream = _CapableStream([pb.MxEvent(session_id="session-1", worker_sequence=9)])
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=7.5),
stub=_NoTimeoutStubStreamEvents(stream),
)
received = [
event
async for event in client.stream_events_raw(
pb.StreamEventsRequest(session_id="session-1"),
)
]
assert len(received) == 1
assert captured["timeout"] == 7.5
+7 -7
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `3cc53a8` |
| Status | Reviewed |
| Open findings | 12 |
| Open findings | 9 |
## Checklist coverage
@@ -63,13 +63,13 @@
| Severity | Medium |
| Category | Error handling & resilience |
| Location | `clients/python/src/mxgateway/client.py:125-137,155-173` |
| Status | Open |
| Status | Resolved |
**Description:** `stream_events_raw` and `query_active_alarms` call the stub directly with a `timeout` kwarg when `stream_timeout` is set, with no `TypeError` fallback. `galaxy.py:watch_deploy_events` and `_unary` *do* have a fallback that strips `timeout` if the callable rejects it. This asymmetry means a fake/older stub that does not accept `timeout` crashes for gateway streams but not Galaxy streams. It is only masked today because `stream_timeout` defaults to `None`.
**Recommendation:** Apply the same `try/except TypeError` timeout-fallback pattern to `stream_events_raw` and `query_active_alarms`, or remove the fallback everywhere and standardise on a single behaviour.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — Confirmed: both stream methods in `client.py` called the stub with `timeout` unconditionally and had no `TypeError` fallback, unlike `_unary` and `galaxy.watch_deploy_events`. Added a shared `_open_stream` helper in `client.py` that opens a server-streaming call and strips the `timeout` kwarg when the stub raises `TypeError: ... unexpected keyword argument 'timeout'`, then routed both `stream_events_raw` and `query_active_alarms` through it. Regression tests in `tests/test_stream_timeout_fallback.py` (`test_stream_events_raw_falls_back_when_stub_rejects_timeout`, `test_query_active_alarms_falls_back_when_stub_rejects_timeout`, `test_stream_events_raw_still_passes_timeout_to_capable_stub`) failed before the fix and pass after. No public behaviour change for real gRPC stubs, so no README update needed.
### Client.Python-004
@@ -93,13 +93,13 @@
| Severity | Medium |
| Category | Performance & resource management |
| Location | `clients/python/src/mxgateway/galaxy.py:117-140` |
| Status | Open |
| Status | Resolved |
**Description:** `discover_hierarchy` pages through the entire Galaxy object hierarchy and accumulates every `GalaxyObject` (each carrying its full attribute list) into a single in-memory `list` before returning. For a large Galaxy this is a very large allocation with no streaming alternative and no caller-side bound.
**Recommendation:** Offer an async-generator variant (e.g. `iter_hierarchy()`) that yields objects/pages as they arrive, keeping `discover_hierarchy()` as a convenience wrapper. At minimum document the memory characteristic.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — Confirmed: `discover_hierarchy` buffered the entire hierarchy with no streaming alternative. Added `GalaxyRepositoryClient.iter_hierarchy`, an async generator that fetches one `DiscoverHierarchyRequest` page at a time and yields each `GalaxyObject` as it arrives, so peak memory is bounded by a single page (`_DISCOVER_HIERARCHY_PAGE_SIZE`). Pages are fetched lazily — the next page is only requested after the current page is fully consumed. `discover_hierarchy` is now a thin convenience wrapper (`[obj async for obj in self.iter_hierarchy()]`) that preserves its `list[GalaxyObject]` contract, including the repeated-page-token guard. Regression tests in `tests/test_galaxy_iter_hierarchy.py` (`test_iter_hierarchy_yields_objects_across_pages`, `test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page`, `test_iter_hierarchy_rejects_repeated_page_token`, `test_discover_hierarchy_still_returns_full_list`) failed before the fix and pass after. `clients/python/README.md` updated with the `iter_hierarchy` usage and memory guidance since this adds a new public method.
### Client.Python-006
@@ -153,13 +153,13 @@
| Severity | Medium |
| Category | Testing coverage |
| Location | `clients/python/tests/` |
| Status | Open |
| Status | Resolved |
**Description:** Several non-trivial public paths are untested: `Session.write2`/`add_item2` request construction; the bulk-size limit `_ensure_bulk_size`/`MAX_BULK_ITEMS` guard; the `None`-argument `TypeError` guards in bulk methods; the TLS `ca_file` read path in `create_channel`; most CLI command bodies; and `map_rpc_error`'s default (non-auth) branch.
**Recommendation:** Add tests for `write2`/`add_item2` request shape, the bulk-size `ValueError`, the `ca_file` TLS branch, the generic `map_rpc_error` fallthrough, and at least one happy-path CLI command using a fake stub.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — Confirmed coverage gap against the existing `tests/` files. Added `tests/test_coverage_gaps.py` covering every path the finding lists: `test_add_item2_sends_item_context_and_returns_handle` and `test_write2_sends_value_and_timestamp_value` (request shape + `MxValue` oneof), `test_subscribe_bulk_rejects_oversized_request` and `test_add_item_bulk_at_limit_is_allowed` (the `MAX_BULK_ITEMS` `_ensure_bulk_size` boundary), `test_advise_item_bulk_rejects_none_argument` (the `None`-argument `TypeError` guard), `test_create_channel_reads_ca_file` and `test_create_channel_missing_ca_file_raises` (the TLS `ca_file` read path), `test_map_rpc_error_generic_branch_returns_transport_error` and `test_map_rpc_error_handles_error_without_code` (the non-auth `map_rpc_error` fallthrough and the no-`code` path), and `test_cli_register_happy_path_emits_server_handle` (a happy-path CLI command body driven end to end through `CliRunner` with a fake stub via a monkeypatched `_connect`). All 10 new tests pass. No source change required — this is a pure coverage finding.
### Client.Python-010
+9 -9
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `6c64030` |
| Status | Reviewed |
| Open findings | 8 |
| Open findings | 4 |
## Checklist coverage
@@ -63,13 +63,13 @@
| Severity | Medium |
| Category | Correctness & logic bugs |
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` |
| Status | Open |
| Status | Resolved |
**Description:** The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registration-state or bad-quality bootstrap event). Because `WaitForFirstMessageAsync` returns whatever arrives first, a genuine ordering/family defect could fail spuriously or leave later wrong events unverified.
**Recommendation:** Filter for the first event with `Family == OnDataChange` (with a bounded retry/poll) or assert the full recorded sequence, so the test verifies the event the worker is supposed to emit.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: Confirmed against source — `WaitForFirstMessageAsync` completed a `TaskCompletionSource` on the very first `WriteAsync`. Replaced it with `RecordingServerStreamWriter.WaitForMessageAsync(predicate, timeout)`, which scans recorded messages, skips earlier non-matching events, and blocks on a `SemaphoreSlim` until a matching one arrives or the timeout elapses (throwing a `TimeoutException` that reports the scanned count). `GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses` now waits for the first `Family == OnDataChange` event. Live execution was not possible in this environment (no MXAccess COM); verified by build.
### IntegrationTests-004
@@ -78,13 +78,13 @@
| Severity | Medium |
| Category | Error handling & resilience |
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` |
| Status | Open |
| Status | Resolved |
**Description:** In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutException` from inside `finally`, which replaces/masks any original assertion failure from the `try` block. The diagnostic value of the real failure is lost.
**Recommendation:** Wrap the `streamTask.WaitAsync` (and ideally `WaitForProcessesAsync`) in a try/catch that logs the cleanup exception via `output.WriteLine` instead of letting it propagate.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: Confirmed — the `finally` block awaited `streamTask.WaitAsync` and `WaitForProcessesAsync` with no exception handling. Extracted a shared `ShutDownAsync` helper that wraps the session-close + stream-drain in one try/catch and the worker-process wait in a second try/catch, logging each cleanup exception via `output.WriteLine` instead of throwing. All three live tests now route shutdown through it, so a cleanup timeout can no longer mask an assertion failure. Live execution was not possible in this environment; verified by build.
### IntegrationTests-005
@@ -93,13 +93,13 @@
| Severity | Medium |
| Category | Testing coverage |
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` |
| Status | Open |
| Status | Resolved |
**Description:** The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` semantics, invalid-handle exceptions). None of `Write`, `WriteSecured`, `Unadvise`, `RemoveItem`, `Unregister`, `OperationComplete`, an invalid-handle command, or a worker-fault path is exercised against live COM — exactly the paths fake-worker tests cannot validate.
**Recommendation:** Add live coverage for at least a `Write` round-trip and an invalid-handle command, plus a worker-fault/abnormal-exit scenario, even if behind additional opt-in env vars.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: Added two `[LiveMxAccessFact]`-gated tests to `WorkerLiveMxAccessSmokeTests`. `GatewaySession_WithLiveWorker_WritesValueToAdvisedItem` registers/adds/advises then issues a `Write` of an integer value, asserting the command round-trips with `ProtocolStatusCode.Ok` and `MxCommandKind.Write`. `GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault` issues `AddItem` against `int.MaxValue` as the server handle (never issued by MXAccess) and asserts the failure surfaces in the command reply without a usable item handle. Both reuse the existing opt-in env var and the `ShutDownAsync` cleanup helper. A worker-fault/abnormal-exit case was deliberately scoped out — it needs a controlled COM crash injection beyond what the existing harness supports; the two added cases cover the `Write` round-trip and invalid-handle paths the recommendation calls out. Live execution was not possible in this environment; verified by build.
### IntegrationTests-006
@@ -108,13 +108,13 @@
| Severity | Medium |
| Category | Testing coverage |
| Location | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` |
| Status | Open |
| Status | Resolved |
**Description:** LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticator` has distinct branches for (the `LdapException` catch, the `candidate is null` branch). The negative test only proves group-membership denial, not credential rejection.
**Recommendation:** Add a live test for `admin` with a wrong password asserting `Succeeded == false` and that the password is not leaked into `FailureMessage`, and a test for an unknown username.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: Added three `[LiveLdapFact]`-gated tests to `DashboardLdapLiveTests`. `AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword` exercises the `LdapException` catch via a rejected candidate bind and asserts the wrong password never reaches `FailureMessage`. `AuthenticateAsync_UnknownUsername_Fails` exercises the `candidate is null` branch. `AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing` builds the authenticator with `LdapOptions.Port = 1` (a reserved port no LDAP server listens on) and asserts the connect failure is absorbed into a failed result rather than thrown — covering the generic `catch (Exception)` branch. All three are gated by the existing `MXGATEWAY_RUN_LIVE_LDAP_TESTS` opt-in so they stay opt-in. Live execution was not possible in this environment (no live LDAP); verified by build.
### IntegrationTests-007
+20 -20
View File
@@ -13,14 +13,14 @@ Each module's `findings.md` is the source of truth; this file is generated from
| [Client.Dotnet](Client.Dotnet/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 5 | 8 |
| [Client.Go](Client.Go/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 7 | 10 |
| [Client.Java](Client.Java/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 7 | 12 |
| [Client.Python](Client.Python/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 12 | 12 |
| [Client.Python](Client.Python/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 9 | 12 |
| [Client.Rust](Client.Rust/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 0 | 12 |
| [Contracts](Contracts/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 8 |
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 10 |
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 4 | 10 |
| [Server](Server/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 14 |
| [Tests](Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 10 | 12 |
| [Tests](Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 6 | 12 |
| [Worker](Worker/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 7 | 15 |
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 13 | 15 |
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 15 |
## Pending findings
@@ -28,23 +28,7 @@ Findings with status `Open` or `In Progress`, ordered by severity.
| ID | Severity | Category | Location | Description |
|---|---|---|---|---|
| Client.Python-003 | Medium | Error handling & resilience | `clients/python/src/mxgateway/client.py:125-137,155-173` | `stream_events_raw` and `query_active_alarms` call the stub directly with a `timeout` kwarg when `stream_timeout` is set, with no `TypeError` fallback. `galaxy.py:watch_deploy_events` and `_unary` *do* have a fallback that strips `timeout`… |
| Client.Python-005 | Medium | Performance & resource management | `clients/python/src/mxgateway/galaxy.py:117-140` | `discover_hierarchy` pages through the entire Galaxy object hierarchy and accumulates every `GalaxyObject` (each carrying its full attribute list) into a single in-memory `list` before returning. For a large Galaxy this is a very large all… |
| Client.Python-009 | Medium | Testing coverage | `clients/python/tests/` | Several non-trivial public paths are untested: `Session.write2`/`add_item2` request construction; the bulk-size limit `_ensure_bulk_size`/`MAX_BULK_ITEMS` guard; the `None`-argument `TypeError` guards in bulk methods; the TLS `ca_file` rea… |
| Contracts-002 | Medium | Error handling & resilience | `src/MxGateway.Contracts/Protos/mxaccess_gateway.proto:384-385`, `:95` | `MxCommandKind` includes `MX_COMMAND_KIND_ACKNOWLEDGE_ALARM_BY_NAME = 29` and `MxCommand.payload` carries `AcknowledgeAlarmByNameCommand acknowledge_alarm_by_name_command = 38`, but `MxCommandReply.payload` has only `acknowledge_alarm = 34… |
| IntegrationTests-003 | Medium | Correctness & logic bugs | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` | The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registratio… |
| IntegrationTests-004 | Medium | Error handling & resilience | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` | In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutExcep… |
| IntegrationTests-005 | Medium | Testing coverage | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` | The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` sem… |
| IntegrationTests-006 | Medium | Testing coverage | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` | LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticat… |
| Tests-003 | Medium | Performance & resource management | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` | `CreateTempDatabasePath` creates a fresh directory under `%TEMP%\mxgateway-auth-tests\<guid>` (and `...-cli-tests`) for every test but nothing ever deletes it. `WorkerProcessLauncherTests.TestDirectory` correctly implements `IDisposable` a… |
| Tests-004 | Medium | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` | The authorization interceptor and `MxAccessGatewayService` are each tested in isolation, but no test composes the interceptor in front of the real service to confirm scope enforcement gates real RPCs end-to-end. A wiring mistake — intercep… |
| Tests-005 | Medium | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` | Worker-crash handling is only tested as a clean terminal exception from `ReadEventsAsync` or a pre-set `ShutdownException`. There is no test for a worker that faults mid-command — an `InvokeAsync` in flight when the pipe/worker dies — whic… |
| Tests-006 | Medium | Concurrency & thread safety | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` | Several tests rely on fixed `Task.Delay` values: `WorkerClientTests.InvokeAsync_WithLateReply…` waits a hard-coded 50 ms after writing a late reply before issuing the second command, and the heartbeat tests use a 20 ms delay to make timest… |
| Worker.Tests-003 | Medium | Concurrency & thread safety | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` | `InvokeAsync_WakesIdlePumpForQueuedCommand` asserts `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` — a wall-clock assertion that on a loaded CI agent can exceed 2s, producing a false failure. The test also does not actually prove the wake e… |
| Worker.Tests-004 | Medium | Concurrency & thread safety | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` | `StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` use poll-until loops, and `Dispose_StopsAlarmPollLoop` additionally does `await Task.Delay(1000)` then asserts `PollCount` is unchanged. The… |
| Worker.Tests-005 | Medium | Performance & resource management | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` | `MemoryStream` instances are created and never disposed across the frame-protocol and pipe-session tests (`MemoryStream stream = new();` with no `using`). Disposal is cheap so impact is low, but it is inconsistent with the rest of the suit… |
| Worker.Tests-006 | Medium | Performance & resource management | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` | `Dispose_StopsAlarmPollLoop` constructs `MxAccessStaSession session` without `using` (unlike every sibling test) and relies on an explicit `session.Dispose()`. If an assertion between `StartAsync` and `Dispose()` throws, the session — its… |
| Worker.Tests-007 | Medium | Design-document adherence | `docs/WorkerFrameProtocol.md:38-49` | `docs/WorkerFrameProtocol.md` instructs running `dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests` and states the frame protocol "is part of `MxGateway.Server`". The frame protocol actually lives in… |
| Client.Dotnet-004 | Low | Error handling & resilience | `clients/dotnet/MxGateway.Client/MxGatewayClient.cs:283-294`, `clients/dotnet/MxGateway.Client/GalaxyRepositoryClient.cs:392-403` | `ExecuteSafeUnaryAsync` wraps the whole Polly retry pipeline in a single linked CTS cancelled after `Options.DefaultCallTimeout`, while `CreateCallOptions` also stamps each individual call with a `DefaultCallTimeout` gRPC deadline. The ret… |
| Client.Dotnet-005 | Low | Correctness & logic bugs | `clients/dotnet/MxGateway.Client/MxGatewaySession.cs:82,124,175` | `RegisterAsync`/`AddItemAsync`/`AddItem2Async` return `reply.<Typed>?.ServerHandle ?? reply.ReturnValue.Int32Value`. After `EnsureMxAccessSuccess()` passes, a missing typed payload silently falls back to `ReturnValue.Int32Value`, which for… |
| Client.Dotnet-006 | Low | Code organization & conventions | `clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs:50`, `clients/dotnet/MxGateway.Client/MxGatewayClientContractInfo.cs:10-14` | `MxGatewayClientOptions.MaxGrpcMessageBytes` and the two `const`s in `MxGatewayClientContractInfo` are public members with no XML doc comments, inconsistent with every other public member in the assembly and with the repo's documented C# s… |
@@ -146,17 +130,33 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
| Client.Java-003 | Medium | Resolved | mxaccessgw conventions | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:119-140` |
| Client.Java-004 | Medium | Resolved | Correctness & logic bugs | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:114-120,157-163,191-197` |
| Client.Java-005 | Medium | Resolved | Error handling & resilience | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:92-105` |
| Client.Python-003 | Medium | Resolved | Error handling & resilience | `clients/python/src/mxgateway/client.py:125-137,155-173` |
| Client.Python-005 | Medium | Resolved | Performance & resource management | `clients/python/src/mxgateway/galaxy.py:117-140` |
| Client.Python-009 | Medium | Resolved | Testing coverage | `clients/python/tests/` |
| Client.Rust-005 | Medium | Resolved | Correctness & logic bugs | `clients/rust/src/session.rs:489-520` |
| Client.Rust-006 | Medium | Resolved | Error handling & resilience | `clients/rust/src/session.rs:531-555` |
| IntegrationTests-003 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` |
| IntegrationTests-004 | Medium | Resolved | Error handling & resilience | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` |
| IntegrationTests-005 | Medium | Resolved | Testing coverage | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` |
| IntegrationTests-006 | Medium | Resolved | Testing coverage | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` |
| Server-002 | Medium | Resolved | Design-document adherence | `src/MxGateway.Server/Program.cs:24`, `src/MxGateway.Server/GatewayApplication.cs` |
| Server-004 | Medium | Resolved | Code organization & conventions | `src/MxGateway.Server/Security/Authentication/ApiKeyAdminCommandLineParser.cs:227-233`, `src/MxGateway.Server/Security/Authentication/ApiKeyAdminCliRunner.cs:53-77`, `src/MxGateway.Server/Dashboard/DashboardApiKeyManagementService.cs:21-67` |
| Server-005 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Server/Galaxy/GalaxyHierarchyRefreshService.cs:22-28`, `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:184` |
| Server-006 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Server/Sessions/SessionManager.cs:84-114` |
| Tests-003 | Medium | Resolved | Performance & resource management | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` |
| Tests-004 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` |
| Tests-005 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` |
| Tests-006 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` |
| Worker-004 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:565-588` |
| Worker-005 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:205-258` (production alarm poll loop) |
| Worker-006 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:117-124`, `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:386-491` |
| Worker-007 | Medium | Resolved | mxaccessgw conventions | `src/MxGateway.Worker/MxAccess/MxAccessComServer.cs:130-150` |
| Worker-008 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:205-249`, `:429-447` |
| Worker.Tests-003 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` |
| Worker.Tests-004 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` |
| Worker.Tests-005 | Medium | Resolved | Performance & resource management | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` |
| Worker.Tests-006 | Medium | Resolved | Performance & resource management | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` |
| Worker.Tests-007 | Medium | Resolved | Design-document adherence | `docs/WorkerFrameProtocol.md:38-49` |
| Client.Rust-004 | Low | Resolved | Documentation & comments | `clients/rust/src/version.rs:7` |
| Client.Rust-007 | Low | Resolved | Design-document adherence | `clients/rust/RustClientDesign.md:14-55` |
| Client.Rust-008 | Low | Resolved | Performance & resource management | `clients/rust/src/value.rs:161-261` |
+11 -9
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `6c64030` |
| Status | Reviewed |
| Open findings | 10 |
| Open findings | 6 |
## Checklist coverage
@@ -65,13 +65,13 @@
| Severity | Medium |
| Category | Performance & resource management |
| Location | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` |
| Status | Open |
| Status | Resolved |
**Description:** `CreateTempDatabasePath` creates a fresh directory under `%TEMP%\mxgateway-auth-tests\<guid>` (and `...-cli-tests`) for every test but nothing ever deletes it. `WorkerProcessLauncherTests.TestDirectory` correctly implements `IDisposable` and cleans up; these two do not. SQLite connection pooling can also keep the `.db` handle open after the test. Over many CI runs this leaks temp files and open handles.
**Recommendation:** Wrap the temp directory in an `IDisposable`/`IAsyncDisposable` helper (as `WorkerProcessLauncherTests` does) and call `SqliteConnection.ClearAllPools()` before deletion, or use `Microsoft.Data.Sqlite` in-memory mode where a real file is not needed.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed root cause — both `CreateTempDatabasePath` helpers created `%TEMP%` directories with no cleanup, and `Microsoft.Data.Sqlite` pools connections by default so the `.db` handle outlives the test. Added a shared `TempDatabaseDirectory` (`src/MxGateway.Tests/Security/Authentication/TempDatabaseDirectory.cs`) `IDisposable` helper that calls `SqliteConnection.ClearAllPools()` and recursively deletes its directory. `SqliteAuthStoreTests` and `ApiKeyAdminCliRunnerTests` now implement `IDisposable`, track every directory created via `CreateTempDatabasePath`, and dispose them after each test. All affected tests still pass.
### Tests-004
@@ -80,13 +80,13 @@
| Severity | Medium |
| Category | Testing coverage |
| Location | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` |
| Status | Open |
| Status | Resolved |
**Description:** The authorization interceptor and `MxAccessGatewayService` are each tested in isolation, but no test composes the interceptor in front of the real service to confirm scope enforcement gates real RPCs end-to-end. A wiring mistake — interceptor not registered, or a new RPC added without a scope mapping in `GatewayGrpcScopeResolver` — would pass every existing test. `GatewayGrpcScopeResolverTests` also only checks an enumerated allow-list; it never asserts an unmapped request type fails closed.
**Recommendation:** Add an end-to-end test that runs `OpenSession`/`Invoke` through the interceptor+service composition with insufficient scope and asserts `PermissionDenied`; add a `GatewayGrpcScopeResolver` test asserting an unknown/unmapped request type throws or denies rather than returning a permissive default.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed the coverage gap. Added three interceptor+service composition tests to `GatewayGrpcAuthorizationInterceptorTests` that run the real `GatewayGrpcAuthorizationInterceptor` continuation into a real `MxAccessGatewayService`: `InterceptorComposedWithService_OpenSessionMissingScope_DeniesBeforeServiceRuns` (asserts `PermissionDenied` and `OpenSessionCount == 0`), `InterceptorComposedWithService_OpenSessionWithScope_RunsServiceWithIdentity` (service runs and observes the interceptor-pushed identity), and `InterceptorComposedWithService_InvokeWriteCommandWithReadScope_DeniesBeforeServiceRuns` (a `Write` command with only `invoke:read` is denied). Added two `GatewayGrpcScopeResolverTests`: `ResolveRequiredScope_UnmappedRequestType_FailsClosedToAdminScope` confirms an unmapped request type resolves to the most-restrictive `Admin` scope (the resolver's `_ => GatewayScopes.Admin` default already fails closed — no product bug), and `ResolveRequiredScope_UnknownInvokeCommandKind_ReturnsInvokeReadScope` confirms an unknown command kind does not silently grant write/admin access.
### Tests-005
@@ -95,13 +95,13 @@
| Severity | Medium |
| Category | Testing coverage |
| Location | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` |
| Status | Open |
| Status | Resolved |
**Description:** Worker-crash handling is only tested as a clean terminal exception from `ReadEventsAsync` or a pre-set `ShutdownException`. There is no test for a worker that faults mid-command — an `InvokeAsync` in flight when the pipe/worker dies — which is a core fault-handling path of the two-process design. `WorkerClientTests` covers pipe-disconnect faulting the read loop, but not the interaction where a pending `InvokeAsync` task observes the fault and surfaces a meaningful error code.
**Recommendation:** Add a `WorkerClient`/`SessionManager` test that disposes the worker pipe (or emits a `WorkerFault`) while an `InvokeAsync` is pending, and assert the invoke task fails with a `WorkerClientException`/`SessionManagerException` carrying the worker-faulted error code.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed the coverage gap and confirmed the product path already handles it correctly (`WorkerClient.ReadLoopAsync``SetFaulted``CompletePendingCommands(fault)` fails every pending command with the fault exception). Added two `WorkerClientTests`: `InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected` (worker reads the command then disposes its pipe side; the pending invoke task fails with `WorkerClientErrorCode.PipeDisconnected`) and `InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted` (worker emits a `WorkerFault` envelope while the invoke is pending; the task fails with `WorkerClientErrorCode.WorkerFaulted`). Both also assert the client transitions to `Faulted`. No product change needed.
### Tests-006
@@ -110,13 +110,15 @@
| Severity | Medium |
| Category | Concurrency & thread safety |
| Location | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` |
| Status | Open |
| Status | Resolved |
**Description:** Several tests rely on fixed `Task.Delay` values: `WorkerClientTests.InvokeAsync_WithLateReply…` waits a hard-coded 50 ms after writing a late reply before issuing the second command, and the heartbeat tests use a 20 ms delay to make timestamps strictly increase. On a slow CI agent the 50 ms delay can be insufficient, and `DateTimeOffset.UtcNow` resolution can make the 20 ms heartbeat-advance assertion flaky.
**Recommendation:** Replace fixed delays with the existing `WaitUntilAsync` condition polling, and inject a controllable `TimeProvider` for heartbeat-timestamp comparisons instead of relying on wall-clock advance.
**Resolution:** _(open)_
**Re-triage note:** The brief flagged `ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess` as "a real `WorkerClient` fault→kill bug". On inspection it is **not a product bug** — it is a test race. `WorkerClient.SetFaulted` publishes the `Faulted` state under lock *before* calling `KillOwnedProcess`, so the old test's `WaitUntilAsync(() => client.State == Faulted)` could return between those two statements and observe `process.KillCount == 0`. The kill itself always runs synchronously inside `SetFaulted`, and `ShutdownAsync`/`DisposeAsync` re-issue an idempotent kill, so no real consumer relies on "state==Faulted implies process dead". The fix is therefore a test-quality fix (correctly Medium / Concurrency), not a product fix.
**Resolution:** Resolved 2026-05-18: (1) Made `ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess` deterministic — it now `await`s `FakeWorkerProcess.WaitForExitAsync` (the `TaskCompletionSource` completed inside `Kill()`), which completes exactly when the kill runs, eliminating the state-polling race; verified by running it five times in isolation (5/5 pass). (2) Removed the fixed 50 ms `Task.Delay` from `InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady` — the stale reply and the second reply are now sent in pipe (FIFO) order, so the read loop discards the stale reply before the second reply with no timing window. (3) Replaced the 20 ms `Task.Delay` heartbeat-advance hacks in `WorkerClientTests.ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess` and `FakeWorkerHarnessTests.SendHeartbeatAsync_UpdatesClientHeartbeatState` with an injected `ManualTimeProvider` advanced by a fixed `TimeSpan`; both tests now assert the exact post-advance timestamp instead of `>` against wall-clock drift.
### Tests-007
+11 -11
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `6c64030` |
| Status | Reviewed |
| Open findings | 13 |
| Open findings | 8 |
## Checklist coverage
@@ -63,13 +63,13 @@
| Severity | Medium |
| Category | Concurrency & thread safety |
| Location | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` |
| Status | Open |
| Status | Resolved |
**Description:** `InvokeAsync_WakesIdlePumpForQueuedCommand` asserts `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` — a wall-clock assertion that on a loaded CI agent can exceed 2s, producing a false failure. The test also does not actually prove the wake event (vs the 50 ms idle pump) caused the dispatch.
**Recommendation:** Remove the wall-clock assertion (the awaited result already proves the command ran), or raise the budget substantially with a comment that it is a coarse smoke check.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — Removed the `Stopwatch` and the `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` wall-clock assertion from `InvokeAsync_WakesIdlePumpForQueuedCommand`. The test already constructs the `StaRuntime` with a 30-second idle pump period, so the awaited `InvokeAsync` completing at all proves the command wake event — not the idle pump tick — drove the dispatch; no timing budget is needed. The XML-doc comment now states this explicitly. The now-unused `using System.Diagnostics;` was removed (`TreatWarningsAsErrors`).
### Worker.Tests-004
@@ -78,13 +78,13 @@
| Severity | Medium |
| Category | Concurrency & thread safety |
| Location | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` |
| Status | Open |
| Status | Resolved |
**Description:** `StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` use poll-until loops, and `Dispose_StopsAlarmPollLoop` additionally does `await Task.Delay(1000)` then asserts `PollCount` is unchanged. The 1s "no further polls" window is a timing race: a poll scheduled just before disposal could increment the counter afterward, and a slow agent could simply not run a poll in the window even without correct stop logic.
**Recommendation:** Make the poll loop deterministically observable — expose a "poll loop stopped" signal or have `Dispose` join the poll task — then assert on that rather than on elapsed-time silence.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — `MxAccessStaSession.Dispose` now joins the alarm poll task (`pollTaskToJoin.Wait(TimeSpan.FromSeconds(5))`) after cancelling the poll CTS, instead of setting `alarmPollTask = null` and discarding it. Once `Dispose` returns, the poll loop has provably exited and no `PollOnce` call can still be in flight. `Dispose_StopsAlarmPollLoop` was rewritten to drop the `await Task.Delay(1000)` "no further polls" window: it now captures `PollCount` immediately after `Dispose()` returns and re-asserts equality after a bare `await Task.Yield()` — a deterministic frozen-count check rather than an elapsed-time race. The success-direction poll-until loop in `PollOnceCalledViaSta` was left as-is: waiting for an event to *occur* is sound; only waiting for an event to *not* occur is the race, and that pattern is now eliminated. Note: `ShutdownGracefullyAsync` already joined the poll task, so this change makes `Dispose` consistent with the graceful path.
### Worker.Tests-005
@@ -93,13 +93,13 @@
| Severity | Medium |
| Category | Performance & resource management |
| Location | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` |
| Status | Open |
| Status | Resolved |
**Description:** `MemoryStream` instances are created and never disposed across the frame-protocol and pipe-session tests (`MemoryStream stream = new();` with no `using`). Disposal is cheap so impact is low, but it is inconsistent with the rest of the suite (which carefully `using`s `CancellationTokenSource`, `StaRuntime`, `PipePair`). `WorkerFrameWriter`/`WorkerFrameReader` are also constructed without disposal.
**Recommendation:** Wrap `MemoryStream` (and reader/writer if they are `IDisposable`) in `using` declarations for consistency.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — All six `MemoryStream` test-body declarations in `WorkerFrameProtocolTests.cs` and the five `inbound`/`outbound` `MemoryStream` declarations in the `WorkerPipeSessionTests.cs` handshake tests were converted to `using` declarations, matching how the rest of the suite handles `CancellationTokenSource`/`StaRuntime`/`PipePair`. Re-triage of the parenthetical: `WorkerFrameWriter` and `WorkerFrameReader` are **not** `IDisposable` (`sealed class` with no `IDisposable` and no `Dispose` member — verified in `src/MxGateway.Worker/Ipc/`), so the finding's "reader/writer if they are `IDisposable`" suggestion does not apply and no change was made there. The shared `MemoryStream` instances inside the `WorkerPipeSessionTests` harness/helper classes (`ReadWrittenFrames` parameter, the `PipePair`/harness fields) are out of the cited line scope and were left untouched.
### Worker.Tests-006
@@ -108,13 +108,13 @@
| Severity | Medium |
| Category | Performance & resource management |
| Location | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` |
| Status | Open |
| Status | Resolved |
**Description:** `Dispose_StopsAlarmPollLoop` constructs `MxAccessStaSession session` without `using` (unlike every sibling test) and relies on an explicit `session.Dispose()`. If an assertion between `StartAsync` and `Dispose()` throws, the session — its STA thread and poll loop — leaks for the rest of the run. The `StaRuntime` is `using`d so the thread is eventually reclaimed, but the alarm poll loop and handler are not.
**Recommendation:** Use `using MxAccessStaSession session = ...` and drop the manual `Dispose()`, or wrap the body in try/finally.
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — `Dispose_StopsAlarmPollLoop` now declares its `MxAccessStaSession` with a `using` declaration. The manual `session.Dispose()` is kept because the test's purpose is to observe poll behaviour across disposal — but `MxAccessStaSession.Dispose` is idempotent (guarded by the `disposed` field), so the explicit mid-test call and the `using`-scope call do not conflict. An assertion thrown anywhere in the body now still tears the session (STA poll loop + alarm handler) down. The cited line numbers in the finding were imprecise — they straddle `PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` — but the described root cause (one `MxAccessStaSession` constructed without `using`) was singular and is the one in `Dispose_StopsAlarmPollLoop`; the sibling tests `PollOnceCalledViaSta` and `RunAlarmPollLoop_WhenPollOnceThrows_RecordsFaultOnEventQueue` already used `using` and needed no change.
### Worker.Tests-007
@@ -123,13 +123,13 @@
| Severity | Medium |
| Category | Design-document adherence |
| Location | `docs/WorkerFrameProtocol.md:38-49` |
| Status | Open |
| Status | Resolved |
**Description:** `docs/WorkerFrameProtocol.md` instructs running `dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests` and states the frame protocol "is part of `MxGateway.Server`". The frame protocol actually lives in `MxGateway.Worker.Ipc` and is tested by `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs`. The doc's verification command points at the wrong project and build, so anyone following it after changing the worker frame protocol will not run the relevant tests.
**Recommendation:** Update `docs/WorkerFrameProtocol.md` to reference `src/MxGateway.Worker.Tests` and the x86 worker build (`-p:Platform=x86`).
**Resolution:** _(open)_
**Resolution:** 2026-05-18 — Rewrote the `## Verification` section of `docs/WorkerFrameProtocol.md`. The test command now targets `src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter WorkerFrameProtocolTests`; the build command now targets `src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86`. The prose now states the frame protocol lives in `MxGateway.Worker.Ipc` (naming `WorkerFrameReader`/`WorkerFrameWriter`/`WorkerFrameProtocolOptions` and the `WorkerFrameProtocolTests.cs` test file) and notes the worker is an x86 process. Verified against the source: the frame-protocol types are confirmed under `src/MxGateway.Worker/Ipc/` and the tests under `src/MxGateway.Worker.Tests/Ipc/`, so the original doc was wrong on both project and component. Fenced code blocks were also relabelled `powershell` (the build/test commands are run from PowerShell on this Windows dev box).
### Worker.Tests-008
+22 -3
View File
@@ -44,9 +44,22 @@ skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
the installed MXAccess COM object and depends on live provider state.
The live smoke opens a gateway session, launches the x86 worker, runs
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
graceful shutdown request even when a command or event assertion fails.
`Register`, `AddItem`, and `Advise`, waits a bounded time for the first
`OnDataChange` event (skipping any earlier bootstrap/registration-state event),
and closes the session in a `finally` block so the worker gets a graceful
shutdown request even when a command or event assertion fails. Cleanup failures
in that `finally` block are logged rather than thrown, so a real assertion
failure is never masked by a shutdown timeout.
`WorkerLiveMxAccessSmokeTests` additionally covers two MXAccess parity paths the
fake-worker tests cannot validate:
- a `Write` round-trip against an advised item, and
- an `AddItem` against an invalid server handle, asserting the MXAccess failure
surfaces in the command reply without faulting the gateway transport.
All three tests are gated by the same `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1`
opt-in variable.
Build the worker before running the smoke:
@@ -119,6 +132,12 @@ GLAuth has only the baseline groups, so this is a hard prerequisite beyond "LDAP
is up." See the "Adding a gw-specific group" section of `glauth.md` for the
provisioning step that adds `GwAdmin` and grants it to `admin`.
The suite covers both the success path and the `DashboardAuthenticator` failure
branches: `admin` in `GwAdmin` succeeds; `readonly` is denied for missing group;
`admin` with a wrong password is rejected by the candidate bind without leaking
the password into `FailureMessage`; an unknown username yields no candidate; and
an unreachable LDAP server is absorbed into a failed result rather than throwing.
Run the LDAP live tests explicitly:
```bash
+11 -6
View File
@@ -35,17 +35,22 @@ oversized frames, protocol version mismatches, and session mismatches.
## Verification
The frame protocol lives in `MxGateway.Worker.Ipc` (`WorkerFrameReader`,
`WorkerFrameWriter`, `WorkerFrameProtocolOptions`) and is covered by
`src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs`. The worker is an
x86 process, so build and test it with `-p:Platform=x86`.
Run the focused tests after changing the frame protocol:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
```powershell
dotnet test src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter WorkerFrameProtocolTests
```
Run the gateway build because the frame protocol is part of
`MxGateway.Server`:
Run the x86 worker build because the frame protocol is part of
`MxGateway.Worker`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```powershell
dotnet build src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86
```
## Related Documentation
@@ -43,6 +43,69 @@ public sealed class DashboardLdapLiveTests
Assert.DoesNotContain("readonly123", result.FailureMessage, StringComparison.Ordinal);
}
[LiveLdapFact]
[Trait("Category", "LiveLdap")]
public async Task AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword()
{
// Exercises the LdapException branch: the user exists and the service
// account search succeeds, but the candidate bind is rejected.
const string wrongPassword = "definitely-not-the-admin-password";
DashboardAuthenticator authenticator = CreateAuthenticator();
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"admin",
wrongPassword,
CancellationToken.None);
Assert.False(result.Succeeded);
Assert.Null(result.Principal);
Assert.DoesNotContain(wrongPassword, result.FailureMessage, StringComparison.Ordinal);
}
[LiveLdapFact]
[Trait("Category", "LiveLdap")]
public async Task AuthenticateAsync_UnknownUsername_Fails()
{
// Exercises the `candidate is null` branch: the service-account search
// returns no entry, so no candidate bind is attempted.
DashboardAuthenticator authenticator = CreateAuthenticator();
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"no-such-user-9f3c1",
"irrelevant-password",
CancellationToken.None);
Assert.False(result.Succeeded);
Assert.Null(result.Principal);
}
[LiveLdapFact]
[Trait("Category", "LiveLdap")]
public async Task AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing()
{
// Exercises the connect-failure path: a closed loopback port produces a
// connection error that DashboardAuthenticator must absorb into a Fail
// result rather than propagating an exception to the dashboard.
DashboardAuthenticator authenticator = new(
Options.Create(new GatewayOptions
{
Ldap = new LdapOptions
{
// 1 is a reserved port number that no LDAP server listens on.
Port = 1,
},
}),
NullLogger<DashboardAuthenticator>.Instance);
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"admin",
"admin123",
CancellationToken.None);
Assert.False(result.Succeeded);
Assert.Null(result.Principal);
}
private static DashboardAuthenticator CreateAuthenticator()
{
return new DashboardAuthenticator(
@@ -86,8 +86,15 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
LogReply("Advise", adviseReply);
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// A live MXAccess provider can deliver an initial registration-state
// or bad-quality bootstrap event before the OnDataChange the worker
// is contracted to emit. Match on the family rather than trusting
// whatever event arrives first so a genuine ordering defect cannot
// pass spuriously or leave a later wrong event unverified.
MxEvent dataChange = await eventWriter
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
.WaitForMessageAsync(
candidate => candidate.Family == MxEventFamily.OnDataChange,
IntegrationTestEnvironment.LiveMxAccessEventTimeout)
.ConfigureAwait(false);
LogEvent(dataChange);
@@ -98,22 +105,184 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
}
finally
{
try
{
if (!string.IsNullOrWhiteSpace(sessionId))
{
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
}
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
}
}
if (streamTask is not null)
/// <summary>
/// Verifies that a Write command round-trips through live MXAccess against an advised item.
/// </summary>
[LiveMxAccessFact]
[Trait("Category", "LiveMxAccess")]
public async Task GatewaySession_WithLiveWorker_WritesValueToAdvisedItem()
{
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
Assert.True(
File.Exists(workerExecutablePath),
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
TestWorkerProcessFactory processFactory = new(output);
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
string? sessionId = null;
Task? streamTask = null;
try
{
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
finally
ClientSessionName = "live-mxaccess-write",
ClientCorrelationId = "live-open-write",
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
},
new TestServerCallContext()).ConfigureAwait(false);
sessionId = openReply.SessionId;
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
RecordingServerStreamWriter<MxEvent> eventWriter = new();
streamTask = fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
eventWriter,
new TestServerCallContext());
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Register", registerReply);
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("AddItem", addItemReply);
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
Assert.True(addItemReply.AddItem.ItemHandle > 0);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(
sessionId,
registerReply.Register.ServerHandle,
addItemReply.AddItem.ItemHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Advise", adviseReply);
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
MxCommandReply writeReply = await fixture.Service.Invoke(
CreateWriteRequest(
sessionId,
registerReply.Register.ServerHandle,
addItemReply.AddItem.ItemHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Write", writeReply);
// The gateway must always report a protocol-level status. MXAccess
// parity details (a write rejection, a secured-item failure) belong
// in hresult / statuses, not in a transport failure — the command
// itself completed its round-trip to the worker and back.
Assert.Equal(ProtocolStatusCode.Ok, writeReply.ProtocolStatus.Code);
Assert.Equal(MxCommandKind.Write, writeReply.Kind);
}
finally
{
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
}
}
/// <summary>
/// Verifies that an AddItem against an invalid server handle surfaces the MXAccess failure
/// without faulting the gateway transport, exercising the invalid-handle parity path.
/// </summary>
[LiveMxAccessFact]
[Trait("Category", "LiveMxAccess")]
public async Task GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault()
{
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
Assert.True(
File.Exists(workerExecutablePath),
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
TestWorkerProcessFactory processFactory = new(output);
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
string? sessionId = null;
try
{
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "live-mxaccess-invalid-handle",
ClientCorrelationId = "live-open-invalid",
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
},
new TestServerCallContext()).ConfigureAwait(false);
sessionId = openReply.SessionId;
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
// Deliberately skip Register: server handle 0x7FFFFFFF was never
// issued by MXAccess. The worker must invoke COM and relay the
// invalid-handle failure rather than the gateway short-circuiting.
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, serverHandle: int.MaxValue),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("AddItem(invalid-handle)", addItemReply);
// MXAccess parity: an invalid handle is an MXAccess-level failure.
// The command still completed its worker round-trip, so the gateway
// protocol status is Ok and the failure shows up in hresult / the
// status proxies — it must not be reported as a transport fault.
Assert.NotEqual(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
Assert.True(
addItemReply.AddItem is null || addItemReply.AddItem.ItemHandle <= 0,
"Invalid-handle AddItem must not yield a usable item handle.");
}
finally
{
await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false);
}
}
/// <summary>
/// Closes the session and drains the event stream / worker processes without letting a
/// cleanup timeout mask the original failure from the test body.
/// </summary>
private async Task ShutDownAsync(
GatewayServiceFixture fixture,
TestWorkerProcessFactory processFactory,
string? sessionId,
Task? streamTask)
{
try
{
if (!string.IsNullOrWhiteSpace(sessionId))
{
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
}
if (streamTask is not null)
{
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
catch (Exception ex)
{
// Cleanup runs in a finally block. A TimeoutException (or a faulted
// StreamEvents task) here would otherwise replace any assertion
// failure raised in the try block. Log it and let the original
// failure surface.
output.WriteLine($"Cleanup error during session/stream shutdown: {ex}");
}
try
{
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
catch (Exception ex)
{
output.WriteLine($"Cleanup error while waiting for worker processes to exit: {ex}");
}
}
@@ -175,6 +344,32 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
};
}
private static MxCommandRequest CreateWriteRequest(
string sessionId,
int serverHandle,
int itemHandle)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-write",
Command = new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = new MxValue
{
DataType = MxDataType.Integer,
Int32Value = 1,
},
},
},
};
}
private async Task CloseSessionAsync(
GatewayServiceFixture fixture,
string sessionId)
@@ -321,8 +516,8 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
{
private readonly object syncRoot = new();
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> messages = [];
private readonly SemaphoreSlim messageArrived = new(0);
/// <summary>
/// All messages that have been written to the stream.
@@ -344,7 +539,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
public WriteOptions? WriteOptions { get; set; }
/// <summary>
/// Records the message and completes the first-message task.
/// Records the message and signals any pending waiter.
/// </summary>
/// <param name="message">The message to write.</param>
public Task WriteAsync(T message)
@@ -354,18 +549,51 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
messages.Add(message);
}
firstMessage.TrySetResult(message);
messageArrived.Release();
return Task.CompletedTask;
}
/// <summary>
/// Waits for the first message up to the specified timeout.
/// Waits for the first recorded message that satisfies <paramref name="predicate"/>,
/// up to the specified timeout. Earlier non-matching messages (for example a
/// registration-state bootstrap event) are skipped rather than treated as the result.
/// </summary>
/// <param name="timeout">The maximum time to wait.</param>
/// <returns>The first message written to the stream.</returns>
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
/// <param name="predicate">Filter the awaited message must satisfy.</param>
/// <param name="timeout">The maximum total time to wait.</param>
/// <returns>The first message that satisfies the predicate.</returns>
public async Task<T> WaitForMessageAsync(
Func<T, bool> predicate,
TimeSpan timeout)
{
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
using CancellationTokenSource timeoutCancellation = new(timeout);
int scanned = 0;
while (true)
{
T[] snapshot;
lock (syncRoot)
{
snapshot = messages.ToArray();
}
for (; scanned < snapshot.Length; scanned++)
{
if (predicate(snapshot[scanned]))
{
return snapshot[scanned];
}
}
try
{
await messageArrived.WaitAsync(timeoutCancellation.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (timeoutCancellation.IsCancellationRequested)
{
throw new TimeoutException(
$"No stream message satisfied the predicate within {timeout}. Recorded {scanned} message(s).");
}
}
}
}
@@ -110,16 +110,21 @@ public sealed class FakeWorkerHarnessTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>Verifies that sending a heartbeat updates the client heartbeat state.</summary>
/// <summary>
/// Verifies that sending a heartbeat updates the client heartbeat state. Uses a
/// <see cref="ManualTimeProvider"/> so the timestamp advance is deterministic rather
/// than relying on a wall-clock <c>Task.Delay</c> exceeding clock resolution.
/// </summary>
[Fact]
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await using WorkerClient client = fakeWorker.CreateClient(timeProvider: clock);
await StartClientAsync(fakeWorker, client);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
clock.Advance(TimeSpan.FromSeconds(1));
await fakeWorker.SendHeartbeatAsync(
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
@@ -128,6 +133,7 @@ public sealed class FakeWorkerHarnessTests
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
}
/// <summary>Verifies that a hung worker times out pending command invocations.</summary>
@@ -215,4 +221,17 @@ public sealed class FakeWorkerHarnessTests
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
/// <summary>Time provider with a manually advanced clock for deterministic timestamp tests.</summary>
private sealed class ManualTimeProvider(DateTimeOffset start) : TimeProvider
{
private DateTimeOffset _now = start;
/// <inheritdoc />
public override DateTimeOffset GetUtcNow() => _now;
/// <summary>Advances the manual clock by the given amount.</summary>
/// <param name="delta">Amount of time to add to the current clock value.</param>
public void Advance(TimeSpan delta) => _now += delta;
}
}
@@ -71,9 +71,11 @@ public sealed class WorkerClientTests
async () => await timedOutInvokeTask);
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
// Send the stale reply for the already-timed-out command, then the second
// command's reply. The pipe is FIFO, so the read loop processes (and discards)
// the stale reply before the second reply — no fixed Task.Delay needed.
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping));
await Task.Delay(TimeSpan.FromMilliseconds(50));
Task<WorkerCommandReply> secondInvokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.GetWorkerInfo),
@@ -142,7 +144,14 @@ public sealed class WorkerClientTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>Verifies that the read loop faults the client when the pipe disconnects.</summary>
/// <summary>
/// Verifies that when the client faults it kills the owned worker process.
/// The assertion waits on <see cref="FakeWorkerProcess.WaitForExitAsync"/>, which
/// completes exactly when <c>Kill</c> runs, instead of polling <c>client.State</c>.
/// Polling state is racy: <see cref="WorkerClient.SetFaulted"/> publishes the
/// <c>Faulted</c> state before it calls <c>KillOwnedProcess</c>, so a state-based
/// wait can observe <c>Faulted</c> while <c>KillCount</c> is still 0.
/// </summary>
[Fact]
public async Task ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess()
{
@@ -164,15 +173,77 @@ public sealed class WorkerClientTests
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
// Deterministic: this completes the instant Kill() runs, with no timing window.
using CancellationTokenSource exitTimeout = new(TestTimeout);
await process.WaitForExitAsync(exitTimeout.Token);
Assert.Equal(WorkerClientState.Faulted, client.State);
Assert.Equal(1, process.KillCount);
Assert.True(process.KillEntireProcessTree);
Assert.True(process.HasExited);
}
/// <summary>
/// Verifies that a worker faulting mid-command — the pipe dropping while an
/// <see cref="WorkerClient.InvokeAsync"/> is still pending — completes the pending
/// invoke task with a <see cref="WorkerClientException"/> carrying the
/// pipe-disconnected error code rather than hanging until the command timeout.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
// The worker received the command but disconnects before replying.
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
await pipePair.DisposeWorkerSideAsync();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.PipeDisconnected, exception.ErrorCode);
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Verifies that a worker emitting a <c>WorkerFault</c> envelope while an
/// <see cref="WorkerClient.InvokeAsync"/> is pending completes the pending invoke
/// task with a <see cref="WorkerClientException"/> carrying the worker-faulted
/// error code.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
await pipePair.WorkerWriter.WriteAsync(CreateWorkerFaultEnvelope("scripted mid-command fault"));
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
{
@@ -244,15 +315,22 @@ public sealed class WorkerClientTests
Assert.True(process.Disposed);
}
/// <summary>
/// Verifies that a heartbeat envelope updates the last-heartbeat timestamp and worker
/// process id. Uses a <see cref="ManualTimeProvider"/> so the timestamp advance is
/// deterministic instead of relying on a wall-clock <c>Task.Delay</c> exceeding
/// <see cref="DateTimeOffset.UtcNow"/> resolution.
/// </summary>
[Fact]
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await using WorkerClient client = CreateClient(pipePair, timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
clock.Advance(TimeSpan.FromSeconds(1));
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
await WaitUntilAsync(
@@ -260,6 +338,7 @@ public sealed class WorkerClientTests
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
}
/// <summary>Verifies that the heartbeat monitor faults the client when the heartbeat expires.</summary>
@@ -288,7 +367,8 @@ public sealed class WorkerClientTests
PipePair pipePair,
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
WorkerProcessHandle? processHandle = null)
WorkerProcessHandle? processHandle = null,
TimeProvider? timeProvider = null)
{
WorkerFrameProtocolOptions frameOptions = new(SessionId);
WorkerClientConnection connection = new(
@@ -298,7 +378,7 @@ public sealed class WorkerClientTests
frameOptions,
processHandle);
return new WorkerClient(connection, options, metrics);
return new WorkerClient(connection, options, metrics, timeProvider);
}
private static WorkerProcessHandle CreateProcessHandle(FakeWorkerProcess process)
@@ -399,6 +479,23 @@ public sealed class WorkerClientTests
});
}
private static WorkerEnvelope CreateWorkerFaultEnvelope(string diagnosticMessage)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 30,
envelope => envelope.WorkerFault = new WorkerFault
{
Category = WorkerFaultCategory.MxaccessCommandFailed,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
});
}
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
{
return CreateWorkerEnvelope(
@@ -509,6 +606,19 @@ public sealed class WorkerClientTests
}
}
/// <summary>Time provider with a manually advanced clock for deterministic timestamp tests.</summary>
private sealed class ManualTimeProvider(DateTimeOffset start) : TimeProvider
{
private DateTimeOffset _now = start;
/// <inheritdoc />
public override DateTimeOffset GetUtcNow() => _now;
/// <summary>Advances the manual clock by the given amount.</summary>
/// <param name="delta">Amount of time to add to the current clock value.</param>
public void Advance(TimeSpan delta) => _now += delta;
}
private sealed class FakeWorkerProcess : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -6,8 +6,9 @@ using MxGateway.Server.Security.Authentication;
namespace MxGateway.Tests.Security.Authentication;
public sealed class ApiKeyAdminCliRunnerTests
public sealed class ApiKeyAdminCliRunnerTests : IDisposable
{
private readonly List<TempDatabaseDirectory> _tempDirectories = [];
/// <summary>Verifies that CreateKeyAsync creates an authenticating key and audits the action.</summary>
[Fact]
public async Task CreateKeyAsync_CreatesAuthenticatingKeyAndAudits()
@@ -249,12 +250,23 @@ public sealed class ApiKeyAdminCliRunnerTests
return services.BuildServiceProvider(validateScopes: true);
}
private static string CreateTempDatabasePath()
/// <summary>Clears SQLite pools and deletes every temporary directory created by this test.</summary>
public void Dispose()
{
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-cli-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(directory);
foreach (TempDatabaseDirectory directory in _tempDirectories)
{
directory.Dispose();
}
return Path.Combine(directory, "gateway-auth.db");
_tempDirectories.Clear();
}
private string CreateTempDatabasePath()
{
TempDatabaseDirectory directory = TempDatabaseDirectory.Create("mxgateway-auth-cli-tests");
_tempDirectories.Add(directory);
return directory.DatabasePath();
}
private static string ReadApiKey(string json)
@@ -11,8 +11,9 @@ namespace MxGateway.Tests.Security.Authentication;
/// <summary>
/// Tests for <see cref="SqliteAuthStore"/>.
/// </summary>
public sealed class SqliteAuthStoreTests
public sealed class SqliteAuthStoreTests : IDisposable
{
private readonly List<TempDatabaseDirectory> _tempDirectories = [];
/// <summary>
/// Verifies that MigrateAsync initializes the database schema.
/// </summary>
@@ -167,12 +168,23 @@ public sealed class SqliteAuthStoreTests
return services.BuildServiceProvider(validateScopes: true);
}
private static string CreateTempDatabasePath()
/// <summary>Clears SQLite pools and deletes every temporary directory created by this test.</summary>
public void Dispose()
{
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(directory);
foreach (TempDatabaseDirectory directory in _tempDirectories)
{
directory.Dispose();
}
return Path.Combine(directory, "gateway-auth.db");
_tempDirectories.Clear();
}
private string CreateTempDatabasePath()
{
TempDatabaseDirectory directory = TempDatabaseDirectory.Create("mxgateway-auth-tests");
_tempDirectories.Add(directory);
return directory.DatabasePath();
}
private static async Task CreateVersionZeroDatabaseAsync(string databasePath)
@@ -0,0 +1,73 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Tests.Security.Authentication;
/// <summary>
/// Disposable temporary directory for SQLite auth-store tests. Each instance owns a
/// unique directory under <c>%TEMP%</c>; <see cref="Dispose"/> clears SQLite connection
/// pools (which otherwise keep the <c>.db</c> file handle open) and deletes the directory
/// so test runs do not leak temp files or open handles.
/// </summary>
internal sealed class TempDatabaseDirectory : IDisposable
{
private bool _disposed;
private TempDatabaseDirectory(string path)
{
Path = path;
}
/// <summary>Gets the path to the temporary directory.</summary>
public string Path { get; }
/// <summary>Creates a new uniquely named temporary directory under the given prefix.</summary>
/// <param name="prefix">Folder name placed under <c>%TEMP%</c> to group related test directories.</param>
public static TempDatabaseDirectory Create(string prefix)
{
string path = System.IO.Path.Combine(
System.IO.Path.GetTempPath(),
prefix,
Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(path);
return new TempDatabaseDirectory(path);
}
/// <summary>Returns a database file path inside this temporary directory.</summary>
/// <param name="fileName">Database file name; defaults to the gateway auth database name.</param>
public string DatabasePath(string fileName = "gateway-auth.db")
{
return System.IO.Path.Combine(Path, fileName);
}
/// <inheritdoc />
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
// Microsoft.Data.Sqlite pools connections by default; clear the pools so the
// underlying file handle is released before the directory is deleted.
SqliteConnection.ClearAllPools();
try
{
if (Directory.Exists(Path))
{
Directory.Delete(Path, recursive: true);
}
}
catch (IOException)
{
// Best-effort cleanup; a transient handle should not fail the test.
}
catch (UnauthorizedAccessException)
{
// Best-effort cleanup; a transient handle should not fail the test.
}
}
}
@@ -1,9 +1,15 @@
using System.Runtime.CompilerServices;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
namespace MxGateway.Tests.Security.Authorization;
@@ -156,6 +162,110 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
Assert.Null(identityAccessor.Current);
}
/// <summary>
/// End-to-end composition test: runs an <c>OpenSession</c> call through the real
/// interceptor in front of the real <see cref="MxAccessGatewayService"/> with a key
/// that lacks the <c>session:open</c> scope, and asserts the interceptor denies the
/// call with <see cref="StatusCode.PermissionDenied"/> before the service runs.
/// </summary>
[Fact]
public async Task InterceptorComposedWithService_OpenSessionMissingScope_DeniesBeforeServiceRuns()
{
GatewayRequestIdentityAccessor identityAccessor = new();
RecordingSessionManager sessionManager = new();
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
identityAccessor);
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.UnaryServerHandler(
new OpenSessionRequest { ClientSessionName = "operator-session" },
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(request, context) => service.OpenSession(request, context)));
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
Assert.Contains(GatewayScopes.SessionOpen, exception.Status.Detail, StringComparison.Ordinal);
Assert.Equal(0, sessionManager.OpenSessionCount);
}
/// <summary>
/// End-to-end composition test: runs an <c>OpenSession</c> call through the real
/// interceptor in front of the real <see cref="MxAccessGatewayService"/> with a key
/// that holds <c>session:open</c>, and asserts the service runs and observes the
/// interceptor-supplied identity.
/// </summary>
[Fact]
public async Task InterceptorComposedWithService_OpenSessionWithScope_RunsServiceWithIdentity()
{
GatewayRequestIdentityAccessor identityAccessor = new();
RecordingSessionManager sessionManager = new();
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.SessionOpen)),
identityAccessor);
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
OpenSessionReply reply = await interceptor.UnaryServerHandler(
new OpenSessionRequest { ClientSessionName = "operator-session" },
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(request, context) => service.OpenSession(request, context));
Assert.Equal("session-1", reply.SessionId);
Assert.Equal(1, sessionManager.OpenSessionCount);
Assert.Equal("Operator Key", sessionManager.LastClientIdentity);
}
/// <summary>
/// End-to-end composition test: an <c>Invoke</c> call through the real interceptor in
/// front of the real service with a key holding only <c>invoke:read</c> is denied
/// because the wrapped command is a write, confirming command-scope mapping is
/// enforced through the full composition.
/// </summary>
[Fact]
public async Task InterceptorComposedWithService_InvokeWriteCommandWithReadScope_DeniesBeforeServiceRuns()
{
GatewayRequestIdentityAccessor identityAccessor = new();
RecordingSessionManager sessionManager = new();
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.InvokeRead)),
identityAccessor);
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
MxCommandRequest request = new()
{
SessionId = "session-1",
Command = new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand { ServerHandle = 1, ItemHandle = 2 },
},
};
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.UnaryServerHandler(
request,
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(req, context) => service.Invoke(req, context)));
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
Assert.Contains(GatewayScopes.InvokeWrite, exception.Status.Detail, StringComparison.Ordinal);
Assert.Equal(0, sessionManager.InvokeCount);
}
private static MxAccessGatewayService CreateService(
ISessionManager sessionManager,
IGatewayRequestIdentityAccessor identityAccessor)
{
return new MxAccessGatewayService(
sessionManager,
identityAccessor,
new AllowAllConstraintEnforcer(),
new MxAccessGrpcRequestValidator(),
new MxAccessGrpcMapper(),
new NoOpEventStreamService(),
new GatewayMetrics(),
NullLogger<MxAccessGatewayService>.Instance);
}
private static GatewayGrpcAuthorizationInterceptor CreateInterceptor(
IApiKeyVerifier apiKeyVerifier,
IGatewayRequestIdentityAccessor identityAccessor,
@@ -188,6 +298,138 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
return new TestServerCallContext([new Metadata.Entry("authorization", authorizationHeader)]);
}
/// <summary>Records whether the gateway service ran past the interceptor for composition tests.</summary>
private sealed class RecordingSessionManager : ISessionManager
{
/// <summary>Gets the number of times OpenSessionAsync was invoked.</summary>
public int OpenSessionCount { get; private set; }
/// <summary>Gets the number of times InvokeAsync was invoked.</summary>
public int InvokeCount { get; private set; }
/// <summary>Gets the last client identity passed to OpenSessionAsync.</summary>
public string? LastClientIdentity { get; private set; }
/// <inheritdoc />
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
OpenSessionCount++;
LastClientIdentity = clientIdentity;
GatewaySession session = new(
"session-1",
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
clientIdentity ?? "client",
"client-session",
"client-correlation",
TimeSpan.FromSeconds(7),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(10),
DateTimeOffset.UtcNow);
return Task.FromResult(session);
}
/// <inheritdoc />
public bool TryGetSession(string sessionId, out GatewaySession session)
{
session = null!;
return false;
}
/// <inheritdoc />
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
InvokeCount++;
return Task.FromResult(new WorkerCommandReply());
}
/// <inheritdoc />
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
return AsyncEnumerable.Empty<WorkerEvent>();
}
/// <inheritdoc />
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
}
/// <inheritdoc />
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
/// <inheritdoc />
public Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
/// <summary>Event stream service that yields nothing; alarm/event RPCs are not under test here.</summary>
private sealed class NoOpEventStreamService : IEventStreamService
{
/// <inheritdoc />
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
}
/// <summary>Constraint enforcer that permits every operation for composition tests.</summary>
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class FakeApiKeyVerifier(ApiKeyVerificationResult result) : IApiKeyVerifier
{
/// <summary>Gets whether the verifier was called.</summary>
@@ -61,4 +61,42 @@ public sealed class GatewayGrpcScopeResolverTests
Assert.Equal(expectedScope, scope);
}
/// <summary>
/// Verifies that an unmapped request type fails closed: the resolver returns the
/// most-restrictive <see cref="GatewayScopes.Admin"/> scope rather than a permissive
/// default, so a newly added RPC that is never mapped is denied to ordinary keys.
/// </summary>
[Fact]
public void ResolveRequiredScope_UnmappedRequestType_FailsClosedToAdminScope()
{
GatewayGrpcScopeResolver resolver = new();
string scope = resolver.ResolveRequiredScope(new UnmappedRequest());
Assert.Equal(GatewayScopes.Admin, scope);
}
/// <summary>
/// Verifies that an <see cref="MxCommandRequest"/> with an unrecognized command kind
/// resolves to the read scope rather than silently granting write or admin access.
/// </summary>
[Fact]
public void ResolveRequiredScope_UnknownInvokeCommandKind_ReturnsInvokeReadScope()
{
GatewayGrpcScopeResolver resolver = new();
string scope = resolver.ResolveRequiredScope(new MxCommandRequest
{
Command = new MxCommand
{
Kind = (MxCommandKind)9999,
},
});
Assert.Equal(GatewayScopes.InvokeRead, scope);
}
/// <summary>Request type intentionally not mapped by the scope resolver.</summary>
private sealed class UnmappedRequest;
}
@@ -1,4 +1,3 @@
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
@@ -19,7 +18,7 @@ public sealed class WorkerFrameProtocolTests
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream stream = new();
using MemoryStream stream = new();
WorkerEnvelope original = CreateGatewayHelloEnvelope();
WorkerFrameWriter writer = new(stream, options);
@@ -39,7 +38,7 @@ public sealed class WorkerFrameProtocolTests
WorkerFrameProtocolOptions options = CreateOptions();
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
envelope.ProtocolVersion++;
MemoryStream stream = new(CreateFrame(envelope));
using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
@@ -56,7 +55,7 @@ public sealed class WorkerFrameProtocolTests
WorkerFrameProtocolOptions options = CreateOptions();
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
envelope.SessionId = "different-session";
MemoryStream stream = new(CreateFrame(envelope));
using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
@@ -71,7 +70,7 @@ public sealed class WorkerFrameProtocolTests
public async Task ReadAsync_WithMalformedLength_ThrowsMalformedLength()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream stream = new(new byte[sizeof(uint)]);
using MemoryStream stream = new(new byte[sizeof(uint)]);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
@@ -86,7 +85,7 @@ public sealed class WorkerFrameProtocolTests
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
using MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
@@ -101,7 +100,7 @@ public sealed class WorkerFrameProtocolTests
public async Task WriteAsync_WithConcurrentCalls_SerializesCompleteFrames()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream stream = new();
using MemoryStream stream = new();
WorkerFrameWriter writer = new(stream, options);
await Task.WhenAll(
@@ -24,10 +24,10 @@ public sealed class WorkerPipeSessionTests
public async Task CompleteStartupHandshakeAsync_WithValidGatewayHello_SendsHelloThenReady()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
using MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
inbound.Position = 0;
MemoryStream outbound = new();
using MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
bool initialized = false;
@@ -55,10 +55,10 @@ public sealed class WorkerPipeSessionTests
public async Task CompleteStartupHandshakeAsync_WithWrongNonce_FaultsBeforeInitialization()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
using MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(nonce: "wrong"));
inbound.Position = 0;
MemoryStream outbound = new();
using MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
bool initialized = false;
@@ -83,10 +83,10 @@ public sealed class WorkerPipeSessionTests
public async Task CompleteStartupHandshakeAsync_WithWrongProtocol_FaultsBeforeInitialization()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
using MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(supportedProtocolVersion: 999));
inbound.Position = 0;
MemoryStream outbound = new();
using MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
bool initialized = false;
@@ -110,8 +110,8 @@ public sealed class WorkerPipeSessionTests
public async Task CompleteStartupHandshakeAsync_WithMalformedFrame_WritesWorkerFault()
{
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
MemoryStream outbound = new();
using MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
using MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
bool initialized = false;
@@ -137,10 +137,10 @@ public sealed class WorkerPipeSessionTests
{
const int hresult = unchecked((int)0x80040154);
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
using MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
inbound.Position = 0;
MemoryStream outbound = new();
using MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
await Assert.ThrowsAsync<COMException>(
@@ -293,7 +293,11 @@ public sealed class MxAccessStaSessionTests
/// <summary>
/// Gap 2: Verifies that the STA poll loop stops when the session is disposed —
/// no further PollOnce calls after disposal.
/// no further PollOnce calls after disposal. <see cref="MxAccessStaSession.Dispose"/>
/// joins the poll task before returning, so once Dispose returns no PollOnce
/// call can still be in flight. The test asserts the poll count is frozen
/// immediately after Dispose and stays frozen — deterministic, with no
/// elapsed-time "no further polls" window that a slow agent could race.
/// </summary>
[Fact]
public async Task Dispose_StopsAlarmPollLoop()
@@ -302,7 +306,11 @@ public sealed class MxAccessStaSessionTests
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
MxAccessStaSession session = new(
// using declaration: if an assertion below throws before the explicit
// Dispose, the session (its STA poll loop and alarm handler) is still
// torn down. Dispose is idempotent, so the explicit call mid-test and
// the using-scope call do not conflict.
using MxAccessStaSession session = new(
runtime,
factory,
eventSink,
@@ -320,11 +328,15 @@ public sealed class MxAccessStaSessionTests
Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose.");
// Dispose joins the poll task; when it returns the loop has stopped
// and no PollOnce call is still running.
session.Dispose();
int pollCountAtDispose = handler.PollCount;
// Wait 1 second and verify no further polls occur.
await Task.Delay(1000);
// The count is already frozen — re-reading after a yield must not
// observe any further poll. This is a deterministic check, not a
// timing window: a poll cannot start once the joined loop has exited.
await Task.Yield();
Assert.Equal(pollCountAtDispose, handler.PollCount);
}
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Sta;
@@ -27,7 +26,15 @@ public sealed class StaRuntimeTests
Assert.Equal(ApartmentState.STA, observation.ApartmentState);
}
/// <summary>Verifies that InvokeAsync wakes the idle pump when a command is queued.</summary>
/// <summary>
/// Verifies that InvokeAsync wakes the idle pump when a command is queued.
/// The pump is configured with a 30-second idle period — far longer than
/// any reasonable test run — so the awaited command completing at all proves
/// the command wake event (not the idle pump tick) drove the dispatch. No
/// wall-clock assertion is used: a loaded CI agent can stall an otherwise
/// correct dispatch past an arbitrary millisecond budget, which would be a
/// false failure.
/// </summary>
[Fact]
public async Task InvokeAsync_WakesIdlePumpForQueuedCommand()
{
@@ -37,15 +44,10 @@ public sealed class StaRuntimeTests
new StaMessagePump(),
TimeSpan.FromSeconds(30));
runtime.Start();
Stopwatch stopwatch = Stopwatch.StartNew();
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
stopwatch.Stop();
Assert.Equal(runtime.StaThreadId, threadId);
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
$"Command took {stopwatch.Elapsed} to execute, so the command wake event did not wake the STA promptly.");
}
/// <summary>Verifies that Shutdown stops the thread and uninitializes the COM apartment.</summary>
@@ -580,13 +580,27 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
RequestShutdown();
// Cancel and discard the STA poll loop.
// Cancel the STA poll loop and join it before disposing the alarm
// handler. Joining (rather than discarding alarmPollTask) makes the
// stop deterministic: once Dispose returns, no further PollOnce calls
// can be in flight, so callers and tests can rely on a frozen poll
// count instead of an elapsed-time "no further polls" window.
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
Task? pollTaskToJoin = alarmPollTask;
alarmPollCts = null;
alarmPollTask = null;
if (pollCtsToDispose is not null)
{
try { pollCtsToDispose.Cancel(); } catch { }
if (pollTaskToJoin is not null)
{
try
{
pollTaskToJoin.Wait(TimeSpan.FromSeconds(5));
}
catch (AggregateException) { }
catch (ObjectDisposedException) { }
}
try { pollCtsToDispose.Dispose(); } catch { }
}