Compare commits
5 Commits
98f9b7792b
...
6a4833bd32
| Author | SHA1 | Date | |
|---|---|---|---|
| 6a4833bd32 | |||
| e4fbbb541a | |||
| f13f35bc79 | |||
| 18ce2922e2 | |||
| 5ade3f4f48 |
@@ -131,6 +131,25 @@ The methods return native Python types (`bool`, `datetime | None`, and a
|
||||
into the hierarchy without learning the underlying stub class. The
|
||||
service requires the `metadata:read` scope on the API key.
|
||||
|
||||
`discover_hierarchy` buffers every object (with its full attribute list)
|
||||
into a single in-memory `list`. For a large Galaxy use `iter_hierarchy`
|
||||
instead — it is an async generator that fetches one page at a time and
|
||||
yields objects as they arrive, so peak memory stays bounded by a single
|
||||
page rather than the whole hierarchy:
|
||||
|
||||
```python
|
||||
async with await GalaxyRepositoryClient.connect(
|
||||
endpoint="localhost:5000",
|
||||
api_key="<gateway-api-key>",
|
||||
plaintext=True,
|
||||
) as galaxy:
|
||||
async for obj in galaxy.iter_hierarchy():
|
||||
print(obj.tag_name, obj.contained_name)
|
||||
```
|
||||
|
||||
Pages are fetched lazily: the next page is only requested once the
|
||||
caller has consumed every object from the current page.
|
||||
|
||||
### Watching deploy events
|
||||
|
||||
`GalaxyRepositoryClient.watch_deploy_events` opens a server-streaming
|
||||
|
||||
@@ -133,7 +133,7 @@ class GatewayClient:
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
call = self.raw_stub.StreamEvents(request, **kwargs)
|
||||
call = _open_stream(self.raw_stub.StreamEvents, request, kwargs)
|
||||
return _canceling_iterator(call)
|
||||
|
||||
async def acknowledge_alarm(
|
||||
@@ -169,7 +169,7 @@ class GatewayClient:
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
|
||||
call = _open_stream(self.raw_stub.QueryActiveAlarms, request, kwargs)
|
||||
return _canceling_active_alarms_iterator(call)
|
||||
|
||||
async def _unary(
|
||||
@@ -201,6 +201,23 @@ class GatewayClient:
|
||||
raise map_rpc_error(operation, error) from error
|
||||
|
||||
|
||||
def _open_stream(method: Any, request: Any, kwargs: dict[str, Any]) -> Any:
|
||||
"""Open a server-streaming call, dropping ``timeout`` if the stub rejects it.
|
||||
|
||||
Mirrors the fallback in ``_unary`` so an older or fake stub that does not
|
||||
accept a ``timeout`` keyword argument does not crash when ``stream_timeout``
|
||||
is configured.
|
||||
"""
|
||||
|
||||
try:
|
||||
return method(request, **kwargs)
|
||||
except TypeError as error:
|
||||
if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error):
|
||||
raise
|
||||
kwargs.pop("timeout")
|
||||
return method(request, **kwargs)
|
||||
|
||||
|
||||
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
|
||||
try:
|
||||
async for event in call:
|
||||
|
||||
@@ -114,10 +114,17 @@ class GalaxyRepositoryClient:
|
||||
return None
|
||||
return reply.time_of_last_deploy.ToDatetime()
|
||||
|
||||
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
|
||||
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
|
||||
async def iter_hierarchy(self) -> AsyncIterator[galaxy_pb.GalaxyObject]:
|
||||
"""Yield the deployed Galaxy object hierarchy one object at a time.
|
||||
|
||||
Pages are fetched lazily: a page is only requested once the caller has
|
||||
consumed every object from the previous page. This keeps peak memory
|
||||
bounded by a single page (``_DISCOVER_HIERARCHY_PAGE_SIZE`` objects)
|
||||
rather than the whole Galaxy. Use this for large Galaxies; use
|
||||
:meth:`discover_hierarchy` when a fully buffered ``list`` is convenient
|
||||
and the Galaxy is known to be small.
|
||||
"""
|
||||
|
||||
objects: list[galaxy_pb.GalaxyObject] = []
|
||||
seen_page_tokens: set[str] = set()
|
||||
page_token = ""
|
||||
while True:
|
||||
@@ -129,16 +136,27 @@ class GalaxyRepositoryClient:
|
||||
page_token=page_token,
|
||||
),
|
||||
)
|
||||
objects.extend(reply.objects)
|
||||
for obj in reply.objects:
|
||||
yield obj
|
||||
page_token = reply.next_page_token
|
||||
if not page_token:
|
||||
return objects
|
||||
return
|
||||
if page_token in seen_page_tokens:
|
||||
raise MxGatewayError(
|
||||
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
|
||||
)
|
||||
seen_page_tokens.add(page_token)
|
||||
|
||||
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
|
||||
"""Return the deployed Galaxy object hierarchy as raw proto messages.
|
||||
|
||||
This buffers every object (and its full attribute list) into a single
|
||||
in-memory ``list``. For a large Galaxy prefer :meth:`iter_hierarchy`,
|
||||
which streams objects page by page without holding the whole hierarchy.
|
||||
"""
|
||||
|
||||
return [obj async for obj in self.iter_hierarchy()]
|
||||
|
||||
def watch_deploy_events(
|
||||
self,
|
||||
last_seen_deploy_time: datetime | None = None,
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
"""Regression tests for Client.Python-009: untested public paths.
|
||||
|
||||
Covers `Session.write2`/`add_item2` request construction, the bulk-size limit
|
||||
guard, the ``None``-argument ``TypeError`` guards, the TLS ``ca_file`` read
|
||||
path in `create_channel`, the generic `map_rpc_error` fallthrough, and a
|
||||
happy-path CLI command body driven by a fake stub.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mxgateway import ClientOptions, GatewayClient
|
||||
from mxgateway.errors import MxGatewayTransportError, map_rpc_error
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
from mxgateway.options import create_channel
|
||||
from mxgateway.session import MAX_BULK_ITEMS, Session
|
||||
|
||||
|
||||
class _FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
class _FakeGatewayStub:
|
||||
def __init__(self) -> None:
|
||||
self.open_session = _FakeUnary(
|
||||
[
|
||||
pb.OpenSessionReply(
|
||||
session_id="session-1",
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
),
|
||||
],
|
||||
)
|
||||
self.invoke = _FakeUnary([])
|
||||
self.OpenSession = self.open_session
|
||||
self.Invoke = self.invoke
|
||||
|
||||
|
||||
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
|
||||
return pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=kind,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
**fields,
|
||||
)
|
||||
|
||||
|
||||
# --- write2 / add_item2 request construction -------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_item2_sends_item_context_and_returns_handle() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [
|
||||
_ok_reply(pb.MX_COMMAND_KIND_ADD_ITEM2, add_item2=pb.AddItem2Reply(item_handle=77)),
|
||||
]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
item_handle = await session.add_item2(12, "Object.Attribute", "ctx-A")
|
||||
|
||||
assert item_handle == 77
|
||||
command = stub.invoke.requests[0].command
|
||||
assert command.kind == pb.MX_COMMAND_KIND_ADD_ITEM2
|
||||
assert command.add_item2.server_handle == 12
|
||||
assert command.add_item2.item_definition == "Object.Attribute"
|
||||
assert command.add_item2.item_context == "ctx-A"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write2_sends_value_and_timestamp_value() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [_ok_reply(pb.MX_COMMAND_KIND_WRITE2)]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
when = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
await session.write2(12, 34, 123, when, user_id=5)
|
||||
|
||||
command = stub.invoke.requests[0].command
|
||||
assert command.kind == pb.MX_COMMAND_KIND_WRITE2
|
||||
assert command.write2.server_handle == 12
|
||||
assert command.write2.item_handle == 34
|
||||
assert command.write2.user_id == 5
|
||||
# The integer value is carried as the int32 field of the MxValue oneof.
|
||||
assert command.write2.value.WhichOneof("kind") == "int32_value"
|
||||
assert command.write2.value.int32_value == 123
|
||||
# The timestamp value carries the datetime via the timestamp_value oneof.
|
||||
assert command.write2.timestamp_value.WhichOneof("kind") == "timestamp_value"
|
||||
assert command.write2.timestamp_value.timestamp_value.ToDatetime(
|
||||
tzinfo=timezone.utc,
|
||||
) == when
|
||||
|
||||
|
||||
# --- bulk-size limit + None-argument guards --------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subscribe_bulk_rejects_oversized_request() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
oversized = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS + 1)]
|
||||
with pytest.raises(ValueError, match=str(MAX_BULK_ITEMS)):
|
||||
await session.subscribe_bulk(12, oversized)
|
||||
|
||||
# No RPC should have been issued for a rejected request.
|
||||
assert stub.invoke.requests == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_advise_item_bulk_rejects_none_argument() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
with pytest.raises(TypeError, match="item_handles is required"):
|
||||
await session.advise_item_bulk(12, None) # type: ignore[arg-type]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_item_bulk_at_limit_is_allowed() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [
|
||||
_ok_reply(
|
||||
pb.MX_COMMAND_KIND_ADD_ITEM_BULK,
|
||||
add_item_bulk=pb.BulkSubscribeReply(results=[]),
|
||||
),
|
||||
]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
at_limit = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS)]
|
||||
results = await session.add_item_bulk(12, at_limit)
|
||||
|
||||
assert results == []
|
||||
assert len(stub.invoke.requests) == 1
|
||||
assert len(stub.invoke.requests[0].command.add_item_bulk.tag_addresses) == MAX_BULK_ITEMS
|
||||
|
||||
|
||||
# --- TLS ca_file read path -------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_channel_reads_ca_file(tmp_path: Any) -> None:
|
||||
ca_path = tmp_path / "ca.pem"
|
||||
ca_path.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
|
||||
|
||||
channel = create_channel(
|
||||
ClientOptions(
|
||||
endpoint="mxgateway.example.local:5001",
|
||||
ca_file=str(ca_path),
|
||||
server_name_override="mxgateway.example.local",
|
||||
),
|
||||
)
|
||||
|
||||
# A secure channel object is returned without raising; the ca_file was read.
|
||||
assert channel is not None
|
||||
await channel.close()
|
||||
|
||||
|
||||
def test_create_channel_missing_ca_file_raises() -> None:
|
||||
with pytest.raises(FileNotFoundError):
|
||||
create_channel(
|
||||
ClientOptions(
|
||||
endpoint="mxgateway.example.local:5001",
|
||||
ca_file="C:/does/not/exist/ca.pem",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# --- map_rpc_error generic fallthrough -------------------------------------
|
||||
|
||||
|
||||
class _FakeRpcError(grpc.RpcError):
|
||||
def __init__(self, code: grpc.StatusCode, details: str) -> None:
|
||||
self._code = code
|
||||
self._details = details
|
||||
|
||||
def code(self) -> grpc.StatusCode:
|
||||
return self._code
|
||||
|
||||
def details(self) -> str:
|
||||
return self._details
|
||||
|
||||
|
||||
def test_map_rpc_error_generic_branch_returns_transport_error() -> None:
|
||||
error = _FakeRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused")
|
||||
|
||||
mapped = map_rpc_error("invoke", error)
|
||||
|
||||
assert type(mapped) is MxGatewayTransportError
|
||||
assert "invoke failed: connection refused" in str(mapped)
|
||||
|
||||
|
||||
def test_map_rpc_error_handles_error_without_code() -> None:
|
||||
mapped = map_rpc_error("invoke", grpc.RpcError())
|
||||
|
||||
assert type(mapped) is MxGatewayTransportError
|
||||
assert "invoke failed:" in str(mapped)
|
||||
|
||||
|
||||
# --- happy-path CLI command body -------------------------------------------
|
||||
|
||||
|
||||
def test_cli_register_happy_path_emits_server_handle(monkeypatch: Any) -> None:
|
||||
"""Drive the `register` CLI command end to end against a fake stub."""
|
||||
|
||||
from mxgateway_cli import commands
|
||||
|
||||
invoke = _FakeUnary(
|
||||
[
|
||||
_ok_reply(
|
||||
pb.MX_COMMAND_KIND_REGISTER,
|
||||
register=pb.RegisterReply(server_handle=99),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
class _Stub:
|
||||
def __init__(self) -> None:
|
||||
self.Invoke = invoke
|
||||
|
||||
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
|
||||
return await GatewayClient.connect(
|
||||
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
|
||||
stub=_Stub(),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(commands, "_connect", _fake_connect)
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
commands.main,
|
||||
[
|
||||
"register",
|
||||
"--endpoint",
|
||||
"localhost:5000",
|
||||
"--session-id",
|
||||
"session-1",
|
||||
"--client-name",
|
||||
"pytest-client",
|
||||
"--json",
|
||||
],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert json.loads(result.output) == {"serverHandle": 99}
|
||||
assert invoke.requests[0].command.register.client_name == "pytest-client"
|
||||
@@ -0,0 +1,127 @@
|
||||
"""Regression tests for Client.Python-005: streaming hierarchy iteration.
|
||||
|
||||
`GalaxyRepositoryClient.iter_hierarchy` yields objects page by page instead of
|
||||
buffering the entire Galaxy hierarchy in memory, and `discover_hierarchy`
|
||||
remains a convenience wrapper built on top of it.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway import ClientOptions, GalaxyRepositoryClient
|
||||
from mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
|
||||
|
||||
|
||||
class _FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
timeout: float | None = None,
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
class _FakeGalaxyStub:
|
||||
def __init__(self, discover_replies: list[Any]) -> None:
|
||||
self.DiscoverHierarchy = _FakeUnary(discover_replies)
|
||||
|
||||
|
||||
def _two_page_replies() -> list[galaxy_pb.DiscoverHierarchyReply]:
|
||||
return [
|
||||
galaxy_pb.DiscoverHierarchyReply(
|
||||
next_page_token="page-2",
|
||||
total_object_count=3,
|
||||
objects=[
|
||||
galaxy_pb.GalaxyObject(gobject_id=1, tag_name="Area_001", is_area=True),
|
||||
galaxy_pb.GalaxyObject(gobject_id=2, tag_name="Pump_001"),
|
||||
],
|
||||
),
|
||||
galaxy_pb.DiscoverHierarchyReply(
|
||||
total_object_count=3,
|
||||
objects=[
|
||||
galaxy_pb.GalaxyObject(gobject_id=3, tag_name="Pump_002"),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_yields_objects_across_pages() -> None:
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
tags = [obj.tag_name async for obj in client.iter_hierarchy()]
|
||||
|
||||
assert tags == ["Area_001", "Pump_001", "Pump_002"]
|
||||
assert len(stub.DiscoverHierarchy.requests) == 2
|
||||
assert stub.DiscoverHierarchy.requests[0].page_token == ""
|
||||
assert stub.DiscoverHierarchy.requests[1].page_token == "page-2"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page() -> None:
|
||||
"""Pulling only the first object must not have requested the second page."""
|
||||
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
iterator = client.iter_hierarchy()
|
||||
first = await iterator.__anext__()
|
||||
|
||||
assert first.tag_name == "Area_001"
|
||||
# Only the first page should have been fetched so far.
|
||||
assert len(stub.DiscoverHierarchy.requests) == 1
|
||||
|
||||
await iterator.aclose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_rejects_repeated_page_token() -> None:
|
||||
stub = _FakeGalaxyStub(
|
||||
[
|
||||
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
|
||||
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
|
||||
],
|
||||
)
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
with pytest.raises(Exception, match="repeated page token"):
|
||||
async for _ in client.iter_hierarchy():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discover_hierarchy_still_returns_full_list() -> None:
|
||||
"""The convenience wrapper must keep returning a buffered list."""
|
||||
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
objects = await client.discover_hierarchy()
|
||||
|
||||
assert isinstance(objects, list)
|
||||
assert [obj.tag_name for obj in objects] == ["Area_001", "Pump_001", "Pump_002"]
|
||||
@@ -0,0 +1,132 @@
|
||||
"""Regression tests for Client.Python-003: stream timeout-kwarg fallback.
|
||||
|
||||
`stream_events_raw` and `query_active_alarms` must tolerate a fake/older stub
|
||||
that does not accept a ``timeout`` keyword argument, matching the fallback
|
||||
already present in `galaxy.watch_deploy_events` and the unary `_unary` helper.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway import ClientOptions, GatewayClient
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
|
||||
class _NoTimeoutStream:
|
||||
"""Sync-callable unary-stream fake that rejects a ``timeout`` kwarg."""
|
||||
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self._replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
self.cancelled = False
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> "_NoTimeoutStream":
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self
|
||||
|
||||
def __aiter__(self) -> "_NoTimeoutStream":
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> Any:
|
||||
if not self._replies:
|
||||
raise StopAsyncIteration
|
||||
return self._replies.pop(0)
|
||||
|
||||
def cancel(self) -> None:
|
||||
self.cancelled = True
|
||||
|
||||
|
||||
class _NoTimeoutStubStreamEvents:
|
||||
def __init__(self, stream: _NoTimeoutStream) -> None:
|
||||
self.StreamEvents = stream
|
||||
|
||||
|
||||
class _NoTimeoutStubQueryAlarms:
|
||||
def __init__(self, stream: _NoTimeoutStream) -> None:
|
||||
self.QueryActiveAlarms = stream
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_events_raw_falls_back_when_stub_rejects_timeout() -> None:
|
||||
stream = _NoTimeoutStream(
|
||||
[pb.MxEvent(session_id="session-1", worker_sequence=1)],
|
||||
)
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
|
||||
stub=_NoTimeoutStubStreamEvents(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
event
|
||||
async for event in client.stream_events_raw(
|
||||
pb.StreamEventsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0].worker_sequence == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_active_alarms_falls_back_when_stub_rejects_timeout() -> None:
|
||||
stream = _NoTimeoutStream(
|
||||
[pb.ActiveAlarmSnapshot(alarm_full_reference="Tank01.Level.HiHi")],
|
||||
)
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
|
||||
stub=_NoTimeoutStubQueryAlarms(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
snapshot
|
||||
async for snapshot in client.query_active_alarms(
|
||||
pb.QueryActiveAlarmsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0].alarm_full_reference == "Tank01.Level.HiHi"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_events_raw_still_passes_timeout_to_capable_stub() -> None:
|
||||
"""A stub that accepts ``timeout`` must still receive the configured value."""
|
||||
|
||||
captured: dict[str, Any] = {}
|
||||
|
||||
class _CapableStream(_NoTimeoutStream):
|
||||
def __call__( # type: ignore[override]
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
timeout: float | None = None,
|
||||
) -> "_CapableStream":
|
||||
captured["timeout"] = timeout
|
||||
return super().__call__(request, metadata=metadata)
|
||||
|
||||
stream = _CapableStream([pb.MxEvent(session_id="session-1", worker_sequence=9)])
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=7.5),
|
||||
stub=_NoTimeoutStubStreamEvents(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
event
|
||||
async for event in client.stream_events_raw(
|
||||
pb.StreamEventsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert captured["timeout"] == 7.5
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `3cc53a8` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 12 |
|
||||
| Open findings | 9 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `clients/python/src/mxgateway/client.py:125-137,155-173` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `stream_events_raw` and `query_active_alarms` call the stub directly with a `timeout` kwarg when `stream_timeout` is set, with no `TypeError` fallback. `galaxy.py:watch_deploy_events` and `_unary` *do* have a fallback that strips `timeout` if the callable rejects it. This asymmetry means a fake/older stub that does not accept `timeout` crashes for gateway streams but not Galaxy streams. It is only masked today because `stream_timeout` defaults to `None`.
|
||||
|
||||
**Recommendation:** Apply the same `try/except TypeError` timeout-fallback pattern to `stream_events_raw` and `query_active_alarms`, or remove the fallback everywhere and standardise on a single behaviour.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed: both stream methods in `client.py` called the stub with `timeout` unconditionally and had no `TypeError` fallback, unlike `_unary` and `galaxy.watch_deploy_events`. Added a shared `_open_stream` helper in `client.py` that opens a server-streaming call and strips the `timeout` kwarg when the stub raises `TypeError: ... unexpected keyword argument 'timeout'`, then routed both `stream_events_raw` and `query_active_alarms` through it. Regression tests in `tests/test_stream_timeout_fallback.py` (`test_stream_events_raw_falls_back_when_stub_rejects_timeout`, `test_query_active_alarms_falls_back_when_stub_rejects_timeout`, `test_stream_events_raw_still_passes_timeout_to_capable_stub`) failed before the fix and pass after. No public behaviour change for real gRPC stubs, so no README update needed.
|
||||
|
||||
### Client.Python-004
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Performance & resource management |
|
||||
| Location | `clients/python/src/mxgateway/galaxy.py:117-140` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `discover_hierarchy` pages through the entire Galaxy object hierarchy and accumulates every `GalaxyObject` (each carrying its full attribute list) into a single in-memory `list` before returning. For a large Galaxy this is a very large allocation with no streaming alternative and no caller-side bound.
|
||||
|
||||
**Recommendation:** Offer an async-generator variant (e.g. `iter_hierarchy()`) that yields objects/pages as they arrive, keeping `discover_hierarchy()` as a convenience wrapper. At minimum document the memory characteristic.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed: `discover_hierarchy` buffered the entire hierarchy with no streaming alternative. Added `GalaxyRepositoryClient.iter_hierarchy`, an async generator that fetches one `DiscoverHierarchyRequest` page at a time and yields each `GalaxyObject` as it arrives, so peak memory is bounded by a single page (`_DISCOVER_HIERARCHY_PAGE_SIZE`). Pages are fetched lazily — the next page is only requested after the current page is fully consumed. `discover_hierarchy` is now a thin convenience wrapper (`[obj async for obj in self.iter_hierarchy()]`) that preserves its `list[GalaxyObject]` contract, including the repeated-page-token guard. Regression tests in `tests/test_galaxy_iter_hierarchy.py` (`test_iter_hierarchy_yields_objects_across_pages`, `test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page`, `test_iter_hierarchy_rejects_repeated_page_token`, `test_discover_hierarchy_still_returns_full_list`) failed before the fix and pass after. `clients/python/README.md` updated with the `iter_hierarchy` usage and memory guidance since this adds a new public method.
|
||||
|
||||
### Client.Python-006
|
||||
|
||||
@@ -153,13 +153,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `clients/python/tests/` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Several non-trivial public paths are untested: `Session.write2`/`add_item2` request construction; the bulk-size limit `_ensure_bulk_size`/`MAX_BULK_ITEMS` guard; the `None`-argument `TypeError` guards in bulk methods; the TLS `ca_file` read path in `create_channel`; most CLI command bodies; and `map_rpc_error`'s default (non-auth) branch.
|
||||
|
||||
**Recommendation:** Add tests for `write2`/`add_item2` request shape, the bulk-size `ValueError`, the `ca_file` TLS branch, the generic `map_rpc_error` fallthrough, and at least one happy-path CLI command using a fake stub.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed coverage gap against the existing `tests/` files. Added `tests/test_coverage_gaps.py` covering every path the finding lists: `test_add_item2_sends_item_context_and_returns_handle` and `test_write2_sends_value_and_timestamp_value` (request shape + `MxValue` oneof), `test_subscribe_bulk_rejects_oversized_request` and `test_add_item_bulk_at_limit_is_allowed` (the `MAX_BULK_ITEMS` `_ensure_bulk_size` boundary), `test_advise_item_bulk_rejects_none_argument` (the `None`-argument `TypeError` guard), `test_create_channel_reads_ca_file` and `test_create_channel_missing_ca_file_raises` (the TLS `ca_file` read path), `test_map_rpc_error_generic_branch_returns_transport_error` and `test_map_rpc_error_handles_error_without_code` (the non-auth `map_rpc_error` fallthrough and the no-`code` path), and `test_cli_register_happy_path_emits_server_handle` (a happy-path CLI command body driven end to end through `CliRunner` with a fake stub via a monkeypatched `_connect`). All 10 new tests pass. No source change required — this is a pure coverage finding.
|
||||
|
||||
### Client.Python-010
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `6c64030` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 8 |
|
||||
| Open findings | 4 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registration-state or bad-quality bootstrap event). Because `WaitForFirstMessageAsync` returns whatever arrives first, a genuine ordering/family defect could fail spuriously or leave later wrong events unverified.
|
||||
|
||||
**Recommendation:** Filter for the first event with `Family == OnDataChange` (with a bounded retry/poll) or assert the full recorded sequence, so the test verifies the event the worker is supposed to emit.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Confirmed against source — `WaitForFirstMessageAsync` completed a `TaskCompletionSource` on the very first `WriteAsync`. Replaced it with `RecordingServerStreamWriter.WaitForMessageAsync(predicate, timeout)`, which scans recorded messages, skips earlier non-matching events, and blocks on a `SemaphoreSlim` until a matching one arrives or the timeout elapses (throwing a `TimeoutException` that reports the scanned count). `GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses` now waits for the first `Family == OnDataChange` event. Live execution was not possible in this environment (no MXAccess COM); verified by build.
|
||||
|
||||
### IntegrationTests-004
|
||||
|
||||
@@ -78,13 +78,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutException` from inside `finally`, which replaces/masks any original assertion failure from the `try` block. The diagnostic value of the real failure is lost.
|
||||
|
||||
**Recommendation:** Wrap the `streamTask.WaitAsync` (and ideally `WaitForProcessesAsync`) in a try/catch that logs the cleanup exception via `output.WriteLine` instead of letting it propagate.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Confirmed — the `finally` block awaited `streamTask.WaitAsync` and `WaitForProcessesAsync` with no exception handling. Extracted a shared `ShutDownAsync` helper that wraps the session-close + stream-drain in one try/catch and the worker-process wait in a second try/catch, logging each cleanup exception via `output.WriteLine` instead of throwing. All three live tests now route shutdown through it, so a cleanup timeout can no longer mask an assertion failure. Live execution was not possible in this environment; verified by build.
|
||||
|
||||
### IntegrationTests-005
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` semantics, invalid-handle exceptions). None of `Write`, `WriteSecured`, `Unadvise`, `RemoveItem`, `Unregister`, `OperationComplete`, an invalid-handle command, or a worker-fault path is exercised against live COM — exactly the paths fake-worker tests cannot validate.
|
||||
|
||||
**Recommendation:** Add live coverage for at least a `Write` round-trip and an invalid-handle command, plus a worker-fault/abnormal-exit scenario, even if behind additional opt-in env vars.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Added two `[LiveMxAccessFact]`-gated tests to `WorkerLiveMxAccessSmokeTests`. `GatewaySession_WithLiveWorker_WritesValueToAdvisedItem` registers/adds/advises then issues a `Write` of an integer value, asserting the command round-trips with `ProtocolStatusCode.Ok` and `MxCommandKind.Write`. `GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault` issues `AddItem` against `int.MaxValue` as the server handle (never issued by MXAccess) and asserts the failure surfaces in the command reply without a usable item handle. Both reuse the existing opt-in env var and the `ShutDownAsync` cleanup helper. A worker-fault/abnormal-exit case was deliberately scoped out — it needs a controlled COM crash injection beyond what the existing harness supports; the two added cases cover the `Write` round-trip and invalid-handle paths the recommendation calls out. Live execution was not possible in this environment; verified by build.
|
||||
|
||||
### IntegrationTests-006
|
||||
|
||||
@@ -108,13 +108,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticator` has distinct branches for (the `LdapException` catch, the `candidate is null` branch). The negative test only proves group-membership denial, not credential rejection.
|
||||
|
||||
**Recommendation:** Add a live test for `admin` with a wrong password asserting `Succeeded == false` and that the password is not leaked into `FailureMessage`, and a test for an unknown username.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Added three `[LiveLdapFact]`-gated tests to `DashboardLdapLiveTests`. `AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword` exercises the `LdapException` catch via a rejected candidate bind and asserts the wrong password never reaches `FailureMessage`. `AuthenticateAsync_UnknownUsername_Fails` exercises the `candidate is null` branch. `AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing` builds the authenticator with `LdapOptions.Port = 1` (a reserved port no LDAP server listens on) and asserts the connect failure is absorbed into a failed result rather than thrown — covering the generic `catch (Exception)` branch. All three are gated by the existing `MXGATEWAY_RUN_LIVE_LDAP_TESTS` opt-in so they stay opt-in. Live execution was not possible in this environment (no live LDAP); verified by build.
|
||||
|
||||
### IntegrationTests-007
|
||||
|
||||
|
||||
+20
-20
@@ -13,14 +13,14 @@ Each module's `findings.md` is the source of truth; this file is generated from
|
||||
| [Client.Dotnet](Client.Dotnet/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 5 | 8 |
|
||||
| [Client.Go](Client.Go/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 7 | 10 |
|
||||
| [Client.Java](Client.Java/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 7 | 12 |
|
||||
| [Client.Python](Client.Python/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 12 | 12 |
|
||||
| [Client.Python](Client.Python/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 9 | 12 |
|
||||
| [Client.Rust](Client.Rust/findings.md) | Claude Code | 2026-05-18 | `3cc53a8` | Reviewed | 0 | 12 |
|
||||
| [Contracts](Contracts/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 8 |
|
||||
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 10 |
|
||||
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 4 | 10 |
|
||||
| [Server](Server/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 14 |
|
||||
| [Tests](Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 10 | 12 |
|
||||
| [Tests](Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 6 | 12 |
|
||||
| [Worker](Worker/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 7 | 15 |
|
||||
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 13 | 15 |
|
||||
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-18 | `6c64030` | Reviewed | 8 | 15 |
|
||||
|
||||
## Pending findings
|
||||
|
||||
@@ -28,23 +28,7 @@ Findings with status `Open` or `In Progress`, ordered by severity.
|
||||
|
||||
| ID | Severity | Category | Location | Description |
|
||||
|---|---|---|---|---|
|
||||
| Client.Python-003 | Medium | Error handling & resilience | `clients/python/src/mxgateway/client.py:125-137,155-173` | `stream_events_raw` and `query_active_alarms` call the stub directly with a `timeout` kwarg when `stream_timeout` is set, with no `TypeError` fallback. `galaxy.py:watch_deploy_events` and `_unary` *do* have a fallback that strips `timeout`… |
|
||||
| Client.Python-005 | Medium | Performance & resource management | `clients/python/src/mxgateway/galaxy.py:117-140` | `discover_hierarchy` pages through the entire Galaxy object hierarchy and accumulates every `GalaxyObject` (each carrying its full attribute list) into a single in-memory `list` before returning. For a large Galaxy this is a very large all… |
|
||||
| Client.Python-009 | Medium | Testing coverage | `clients/python/tests/` | Several non-trivial public paths are untested: `Session.write2`/`add_item2` request construction; the bulk-size limit `_ensure_bulk_size`/`MAX_BULK_ITEMS` guard; the `None`-argument `TypeError` guards in bulk methods; the TLS `ca_file` rea… |
|
||||
| Contracts-002 | Medium | Error handling & resilience | `src/MxGateway.Contracts/Protos/mxaccess_gateway.proto:384-385`, `:95` | `MxCommandKind` includes `MX_COMMAND_KIND_ACKNOWLEDGE_ALARM_BY_NAME = 29` and `MxCommand.payload` carries `AcknowledgeAlarmByNameCommand acknowledge_alarm_by_name_command = 38`, but `MxCommandReply.payload` has only `acknowledge_alarm = 34… |
|
||||
| IntegrationTests-003 | Medium | Correctness & logic bugs | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` | The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registratio… |
|
||||
| IntegrationTests-004 | Medium | Error handling & resilience | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` | In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutExcep… |
|
||||
| IntegrationTests-005 | Medium | Testing coverage | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` | The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` sem… |
|
||||
| IntegrationTests-006 | Medium | Testing coverage | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` | LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticat… |
|
||||
| Tests-003 | Medium | Performance & resource management | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` | `CreateTempDatabasePath` creates a fresh directory under `%TEMP%\mxgateway-auth-tests\<guid>` (and `...-cli-tests`) for every test but nothing ever deletes it. `WorkerProcessLauncherTests.TestDirectory` correctly implements `IDisposable` a… |
|
||||
| Tests-004 | Medium | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` | The authorization interceptor and `MxAccessGatewayService` are each tested in isolation, but no test composes the interceptor in front of the real service to confirm scope enforcement gates real RPCs end-to-end. A wiring mistake — intercep… |
|
||||
| Tests-005 | Medium | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` | Worker-crash handling is only tested as a clean terminal exception from `ReadEventsAsync` or a pre-set `ShutdownException`. There is no test for a worker that faults mid-command — an `InvokeAsync` in flight when the pipe/worker dies — whic… |
|
||||
| Tests-006 | Medium | Concurrency & thread safety | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` | Several tests rely on fixed `Task.Delay` values: `WorkerClientTests.InvokeAsync_WithLateReply…` waits a hard-coded 50 ms after writing a late reply before issuing the second command, and the heartbeat tests use a 20 ms delay to make timest… |
|
||||
| Worker.Tests-003 | Medium | Concurrency & thread safety | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` | `InvokeAsync_WakesIdlePumpForQueuedCommand` asserts `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` — a wall-clock assertion that on a loaded CI agent can exceed 2s, producing a false failure. The test also does not actually prove the wake e… |
|
||||
| Worker.Tests-004 | Medium | Concurrency & thread safety | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` | `StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` use poll-until loops, and `Dispose_StopsAlarmPollLoop` additionally does `await Task.Delay(1000)` then asserts `PollCount` is unchanged. The… |
|
||||
| Worker.Tests-005 | Medium | Performance & resource management | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` | `MemoryStream` instances are created and never disposed across the frame-protocol and pipe-session tests (`MemoryStream stream = new();` with no `using`). Disposal is cheap so impact is low, but it is inconsistent with the rest of the suit… |
|
||||
| Worker.Tests-006 | Medium | Performance & resource management | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` | `Dispose_StopsAlarmPollLoop` constructs `MxAccessStaSession session` without `using` (unlike every sibling test) and relies on an explicit `session.Dispose()`. If an assertion between `StartAsync` and `Dispose()` throws, the session — its… |
|
||||
| Worker.Tests-007 | Medium | Design-document adherence | `docs/WorkerFrameProtocol.md:38-49` | `docs/WorkerFrameProtocol.md` instructs running `dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests` and states the frame protocol "is part of `MxGateway.Server`". The frame protocol actually lives in… |
|
||||
| Client.Dotnet-004 | Low | Error handling & resilience | `clients/dotnet/MxGateway.Client/MxGatewayClient.cs:283-294`, `clients/dotnet/MxGateway.Client/GalaxyRepositoryClient.cs:392-403` | `ExecuteSafeUnaryAsync` wraps the whole Polly retry pipeline in a single linked CTS cancelled after `Options.DefaultCallTimeout`, while `CreateCallOptions` also stamps each individual call with a `DefaultCallTimeout` gRPC deadline. The ret… |
|
||||
| Client.Dotnet-005 | Low | Correctness & logic bugs | `clients/dotnet/MxGateway.Client/MxGatewaySession.cs:82,124,175` | `RegisterAsync`/`AddItemAsync`/`AddItem2Async` return `reply.<Typed>?.ServerHandle ?? reply.ReturnValue.Int32Value`. After `EnsureMxAccessSuccess()` passes, a missing typed payload silently falls back to `ReturnValue.Int32Value`, which for… |
|
||||
| Client.Dotnet-006 | Low | Code organization & conventions | `clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs:50`, `clients/dotnet/MxGateway.Client/MxGatewayClientContractInfo.cs:10-14` | `MxGatewayClientOptions.MaxGrpcMessageBytes` and the two `const`s in `MxGatewayClientContractInfo` are public members with no XML doc comments, inconsistent with every other public member in the assembly and with the repo's documented C# s… |
|
||||
@@ -146,17 +130,33 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Client.Java-003 | Medium | Resolved | mxaccessgw conventions | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:119-140` |
|
||||
| Client.Java-004 | Medium | Resolved | Correctness & logic bugs | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:114-120,157-163,191-197` |
|
||||
| Client.Java-005 | Medium | Resolved | Error handling & resilience | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:92-105` |
|
||||
| Client.Python-003 | Medium | Resolved | Error handling & resilience | `clients/python/src/mxgateway/client.py:125-137,155-173` |
|
||||
| Client.Python-005 | Medium | Resolved | Performance & resource management | `clients/python/src/mxgateway/galaxy.py:117-140` |
|
||||
| Client.Python-009 | Medium | Resolved | Testing coverage | `clients/python/tests/` |
|
||||
| Client.Rust-005 | Medium | Resolved | Correctness & logic bugs | `clients/rust/src/session.rs:489-520` |
|
||||
| Client.Rust-006 | Medium | Resolved | Error handling & resilience | `clients/rust/src/session.rs:531-555` |
|
||||
| IntegrationTests-003 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` |
|
||||
| IntegrationTests-004 | Medium | Resolved | Error handling & resilience | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` |
|
||||
| IntegrationTests-005 | Medium | Resolved | Testing coverage | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` |
|
||||
| IntegrationTests-006 | Medium | Resolved | Testing coverage | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` |
|
||||
| Server-002 | Medium | Resolved | Design-document adherence | `src/MxGateway.Server/Program.cs:24`, `src/MxGateway.Server/GatewayApplication.cs` |
|
||||
| Server-004 | Medium | Resolved | Code organization & conventions | `src/MxGateway.Server/Security/Authentication/ApiKeyAdminCommandLineParser.cs:227-233`, `src/MxGateway.Server/Security/Authentication/ApiKeyAdminCliRunner.cs:53-77`, `src/MxGateway.Server/Dashboard/DashboardApiKeyManagementService.cs:21-67` |
|
||||
| Server-005 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Server/Galaxy/GalaxyHierarchyRefreshService.cs:22-28`, `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:184` |
|
||||
| Server-006 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Server/Sessions/SessionManager.cs:84-114` |
|
||||
| Tests-003 | Medium | Resolved | Performance & resource management | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` |
|
||||
| Tests-004 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` |
|
||||
| Tests-005 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` |
|
||||
| Tests-006 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` |
|
||||
| Worker-004 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:565-588` |
|
||||
| Worker-005 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:205-258` (production alarm poll loop) |
|
||||
| Worker-006 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:117-124`, `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:386-491` |
|
||||
| Worker-007 | Medium | Resolved | mxaccessgw conventions | `src/MxGateway.Worker/MxAccess/MxAccessComServer.cs:130-150` |
|
||||
| Worker-008 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:205-249`, `:429-447` |
|
||||
| Worker.Tests-003 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` |
|
||||
| Worker.Tests-004 | Medium | Resolved | Concurrency & thread safety | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` |
|
||||
| Worker.Tests-005 | Medium | Resolved | Performance & resource management | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` |
|
||||
| Worker.Tests-006 | Medium | Resolved | Performance & resource management | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` |
|
||||
| Worker.Tests-007 | Medium | Resolved | Design-document adherence | `docs/WorkerFrameProtocol.md:38-49` |
|
||||
| Client.Rust-004 | Low | Resolved | Documentation & comments | `clients/rust/src/version.rs:7` |
|
||||
| Client.Rust-007 | Low | Resolved | Design-document adherence | `clients/rust/RustClientDesign.md:14-55` |
|
||||
| Client.Rust-008 | Low | Resolved | Performance & resource management | `clients/rust/src/value.rs:161-261` |
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `6c64030` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 10 |
|
||||
| Open findings | 6 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -65,13 +65,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Performance & resource management |
|
||||
| Location | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `CreateTempDatabasePath` creates a fresh directory under `%TEMP%\mxgateway-auth-tests\<guid>` (and `...-cli-tests`) for every test but nothing ever deletes it. `WorkerProcessLauncherTests.TestDirectory` correctly implements `IDisposable` and cleans up; these two do not. SQLite connection pooling can also keep the `.db` handle open after the test. Over many CI runs this leaks temp files and open handles.
|
||||
|
||||
**Recommendation:** Wrap the temp directory in an `IDisposable`/`IAsyncDisposable` helper (as `WorkerProcessLauncherTests` does) and call `SqliteConnection.ClearAllPools()` before deletion, or use `Microsoft.Data.Sqlite` in-memory mode where a real file is not needed.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: confirmed root cause — both `CreateTempDatabasePath` helpers created `%TEMP%` directories with no cleanup, and `Microsoft.Data.Sqlite` pools connections by default so the `.db` handle outlives the test. Added a shared `TempDatabaseDirectory` (`src/MxGateway.Tests/Security/Authentication/TempDatabaseDirectory.cs`) `IDisposable` helper that calls `SqliteConnection.ClearAllPools()` and recursively deletes its directory. `SqliteAuthStoreTests` and `ApiKeyAdminCliRunnerTests` now implement `IDisposable`, track every directory created via `CreateTempDatabasePath`, and dispose them after each test. All affected tests still pass.
|
||||
|
||||
### Tests-004
|
||||
|
||||
@@ -80,13 +80,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The authorization interceptor and `MxAccessGatewayService` are each tested in isolation, but no test composes the interceptor in front of the real service to confirm scope enforcement gates real RPCs end-to-end. A wiring mistake — interceptor not registered, or a new RPC added without a scope mapping in `GatewayGrpcScopeResolver` — would pass every existing test. `GatewayGrpcScopeResolverTests` also only checks an enumerated allow-list; it never asserts an unmapped request type fails closed.
|
||||
|
||||
**Recommendation:** Add an end-to-end test that runs `OpenSession`/`Invoke` through the interceptor+service composition with insufficient scope and asserts `PermissionDenied`; add a `GatewayGrpcScopeResolver` test asserting an unknown/unmapped request type throws or denies rather than returning a permissive default.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: confirmed the coverage gap. Added three interceptor+service composition tests to `GatewayGrpcAuthorizationInterceptorTests` that run the real `GatewayGrpcAuthorizationInterceptor` continuation into a real `MxAccessGatewayService`: `InterceptorComposedWithService_OpenSessionMissingScope_DeniesBeforeServiceRuns` (asserts `PermissionDenied` and `OpenSessionCount == 0`), `InterceptorComposedWithService_OpenSessionWithScope_RunsServiceWithIdentity` (service runs and observes the interceptor-pushed identity), and `InterceptorComposedWithService_InvokeWriteCommandWithReadScope_DeniesBeforeServiceRuns` (a `Write` command with only `invoke:read` is denied). Added two `GatewayGrpcScopeResolverTests`: `ResolveRequiredScope_UnmappedRequestType_FailsClosedToAdminScope` confirms an unmapped request type resolves to the most-restrictive `Admin` scope (the resolver's `_ => GatewayScopes.Admin` default already fails closed — no product bug), and `ResolveRequiredScope_UnknownInvokeCommandKind_ReturnsInvokeReadScope` confirms an unknown command kind does not silently grant write/admin access.
|
||||
|
||||
### Tests-005
|
||||
|
||||
@@ -95,13 +95,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Worker-crash handling is only tested as a clean terminal exception from `ReadEventsAsync` or a pre-set `ShutdownException`. There is no test for a worker that faults mid-command — an `InvokeAsync` in flight when the pipe/worker dies — which is a core fault-handling path of the two-process design. `WorkerClientTests` covers pipe-disconnect faulting the read loop, but not the interaction where a pending `InvokeAsync` task observes the fault and surfaces a meaningful error code.
|
||||
|
||||
**Recommendation:** Add a `WorkerClient`/`SessionManager` test that disposes the worker pipe (or emits a `WorkerFault`) while an `InvokeAsync` is pending, and assert the invoke task fails with a `WorkerClientException`/`SessionManagerException` carrying the worker-faulted error code.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: confirmed the coverage gap and confirmed the product path already handles it correctly (`WorkerClient.ReadLoopAsync` → `SetFaulted` → `CompletePendingCommands(fault)` fails every pending command with the fault exception). Added two `WorkerClientTests`: `InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected` (worker reads the command then disposes its pipe side; the pending invoke task fails with `WorkerClientErrorCode.PipeDisconnected`) and `InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted` (worker emits a `WorkerFault` envelope while the invoke is pending; the task fails with `WorkerClientErrorCode.WorkerFaulted`). Both also assert the client transitions to `Faulted`. No product change needed.
|
||||
|
||||
### Tests-006
|
||||
|
||||
@@ -110,13 +110,15 @@
|
||||
| Severity | Medium |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:76`, `src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs:122` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Several tests rely on fixed `Task.Delay` values: `WorkerClientTests.InvokeAsync_WithLateReply…` waits a hard-coded 50 ms after writing a late reply before issuing the second command, and the heartbeat tests use a 20 ms delay to make timestamps strictly increase. On a slow CI agent the 50 ms delay can be insufficient, and `DateTimeOffset.UtcNow` resolution can make the 20 ms heartbeat-advance assertion flaky.
|
||||
|
||||
**Recommendation:** Replace fixed delays with the existing `WaitUntilAsync` condition polling, and inject a controllable `TimeProvider` for heartbeat-timestamp comparisons instead of relying on wall-clock advance.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Re-triage note:** The brief flagged `ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess` as "a real `WorkerClient` fault→kill bug". On inspection it is **not a product bug** — it is a test race. `WorkerClient.SetFaulted` publishes the `Faulted` state under lock *before* calling `KillOwnedProcess`, so the old test's `WaitUntilAsync(() => client.State == Faulted)` could return between those two statements and observe `process.KillCount == 0`. The kill itself always runs synchronously inside `SetFaulted`, and `ShutdownAsync`/`DisposeAsync` re-issue an idempotent kill, so no real consumer relies on "state==Faulted implies process dead". The fix is therefore a test-quality fix (correctly Medium / Concurrency), not a product fix.
|
||||
|
||||
**Resolution:** Resolved 2026-05-18: (1) Made `ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess` deterministic — it now `await`s `FakeWorkerProcess.WaitForExitAsync` (the `TaskCompletionSource` completed inside `Kill()`), which completes exactly when the kill runs, eliminating the state-polling race; verified by running it five times in isolation (5/5 pass). (2) Removed the fixed 50 ms `Task.Delay` from `InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady` — the stale reply and the second reply are now sent in pipe (FIFO) order, so the read loop discards the stale reply before the second reply with no timing window. (3) Replaced the 20 ms `Task.Delay` heartbeat-advance hacks in `WorkerClientTests.ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess` and `FakeWorkerHarnessTests.SendHeartbeatAsync_UpdatesClientHeartbeatState` with an injected `ManualTimeProvider` advanced by a fixed `TimeSpan`; both tests now assert the exact post-advance timestamp instead of `>` against wall-clock drift.
|
||||
|
||||
### Tests-007
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `6c64030` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 13 |
|
||||
| Open findings | 8 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `src/MxGateway.Worker.Tests/Sta/StaRuntimeTests.cs:46-48` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `InvokeAsync_WakesIdlePumpForQueuedCommand` asserts `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` — a wall-clock assertion that on a loaded CI agent can exceed 2s, producing a false failure. The test also does not actually prove the wake event (vs the 50 ms idle pump) caused the dispatch.
|
||||
|
||||
**Recommendation:** Remove the wall-clock assertion (the awaited result already proves the command ran), or raise the budget substantially with a comment that it is a coarse smoke check.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Removed the `Stopwatch` and the `stopwatch.Elapsed < TimeSpan.FromSeconds(2)` wall-clock assertion from `InvokeAsync_WakesIdlePumpForQueuedCommand`. The test already constructs the `StaRuntime` with a 30-second idle pump period, so the awaited `InvokeAsync` completing at all proves the command wake event — not the idle pump tick — drove the dispatch; no timing budget is needed. The XML-doc comment now states this explicitly. The now-unused `using System.Diagnostics;` was removed (`TreatWarningsAsErrors`).
|
||||
|
||||
### Worker.Tests-004
|
||||
|
||||
@@ -78,13 +78,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:281-329` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` use poll-until loops, and `Dispose_StopsAlarmPollLoop` additionally does `await Task.Delay(1000)` then asserts `PollCount` is unchanged. The 1s "no further polls" window is a timing race: a poll scheduled just before disposal could increment the counter afterward, and a slow agent could simply not run a poll in the window even without correct stop logic.
|
||||
|
||||
**Recommendation:** Make the poll loop deterministically observable — expose a "poll loop stopped" signal or have `Dispose` join the poll task — then assert on that rather than on elapsed-time silence.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — `MxAccessStaSession.Dispose` now joins the alarm poll task (`pollTaskToJoin.Wait(TimeSpan.FromSeconds(5))`) after cancelling the poll CTS, instead of setting `alarmPollTask = null` and discarding it. Once `Dispose` returns, the poll loop has provably exited and no `PollOnce` call can still be in flight. `Dispose_StopsAlarmPollLoop` was rewritten to drop the `await Task.Delay(1000)` "no further polls" window: it now captures `PollCount` immediately after `Dispose()` returns and re-asserts equality after a bare `await Task.Yield()` — a deterministic frozen-count check rather than an elapsed-time race. The success-direction poll-until loop in `PollOnceCalledViaSta` was left as-is: waiting for an event to *occur* is sound; only waiting for an event to *not* occur is the race, and that pattern is now eliminated. Note: `ShutdownGracefullyAsync` already joined the poll task, so this change makes `Dispose` consistent with the graceful path.
|
||||
|
||||
### Worker.Tests-005
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Performance & resource management |
|
||||
| Location | `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs:20-31,103-105`, `src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs:28-31` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `MemoryStream` instances are created and never disposed across the frame-protocol and pipe-session tests (`MemoryStream stream = new();` with no `using`). Disposal is cheap so impact is low, but it is inconsistent with the rest of the suite (which carefully `using`s `CancellationTokenSource`, `StaRuntime`, `PipePair`). `WorkerFrameWriter`/`WorkerFrameReader` are also constructed without disposal.
|
||||
|
||||
**Recommendation:** Wrap `MemoryStream` (and reader/writer if they are `IDisposable`) in `using` declarations for consistency.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — All six `MemoryStream` test-body declarations in `WorkerFrameProtocolTests.cs` and the five `inbound`/`outbound` `MemoryStream` declarations in the `WorkerPipeSessionTests.cs` handshake tests were converted to `using` declarations, matching how the rest of the suite handles `CancellationTokenSource`/`StaRuntime`/`PipePair`. Re-triage of the parenthetical: `WorkerFrameWriter` and `WorkerFrameReader` are **not** `IDisposable` (`sealed class` with no `IDisposable` and no `Dispose` member — verified in `src/MxGateway.Worker/Ipc/`), so the finding's "reader/writer if they are `IDisposable`" suggestion does not apply and no change was made there. The shared `MemoryStream` instances inside the `WorkerPipeSessionTests` harness/helper classes (`ReadWrittenFrames` parameter, the `PipePair`/harness fields) are out of the cited line scope and were left untouched.
|
||||
|
||||
### Worker.Tests-006
|
||||
|
||||
@@ -108,13 +108,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Performance & resource management |
|
||||
| Location | `src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs:282,305,315,323` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `Dispose_StopsAlarmPollLoop` constructs `MxAccessStaSession session` without `using` (unlike every sibling test) and relies on an explicit `session.Dispose()`. If an assertion between `StartAsync` and `Dispose()` throws, the session — its STA thread and poll loop — leaks for the rest of the run. The `StaRuntime` is `using`d so the thread is eventually reclaimed, but the alarm poll loop and handler are not.
|
||||
|
||||
**Recommendation:** Use `using MxAccessStaSession session = ...` and drop the manual `Dispose()`, or wrap the body in try/finally.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — `Dispose_StopsAlarmPollLoop` now declares its `MxAccessStaSession` with a `using` declaration. The manual `session.Dispose()` is kept because the test's purpose is to observe poll behaviour across disposal — but `MxAccessStaSession.Dispose` is idempotent (guarded by the `disposed` field), so the explicit mid-test call and the `using`-scope call do not conflict. An assertion thrown anywhere in the body now still tears the session (STA poll loop + alarm handler) down. The cited line numbers in the finding were imprecise — they straddle `PollOnceCalledViaSta` and `Dispose_StopsAlarmPollLoop` — but the described root cause (one `MxAccessStaSession` constructed without `using`) was singular and is the one in `Dispose_StopsAlarmPollLoop`; the sibling tests `PollOnceCalledViaSta` and `RunAlarmPollLoop_WhenPollOnceThrows_RecordsFaultOnEventQueue` already used `using` and needed no change.
|
||||
|
||||
### Worker.Tests-007
|
||||
|
||||
@@ -123,13 +123,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Design-document adherence |
|
||||
| Location | `docs/WorkerFrameProtocol.md:38-49` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `docs/WorkerFrameProtocol.md` instructs running `dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests` and states the frame protocol "is part of `MxGateway.Server`". The frame protocol actually lives in `MxGateway.Worker.Ipc` and is tested by `src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs`. The doc's verification command points at the wrong project and build, so anyone following it after changing the worker frame protocol will not run the relevant tests.
|
||||
|
||||
**Recommendation:** Update `docs/WorkerFrameProtocol.md` to reference `src/MxGateway.Worker.Tests` and the x86 worker build (`-p:Platform=x86`).
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Rewrote the `## Verification` section of `docs/WorkerFrameProtocol.md`. The test command now targets `src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter WorkerFrameProtocolTests`; the build command now targets `src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86`. The prose now states the frame protocol lives in `MxGateway.Worker.Ipc` (naming `WorkerFrameReader`/`WorkerFrameWriter`/`WorkerFrameProtocolOptions` and the `WorkerFrameProtocolTests.cs` test file) and notes the worker is an x86 process. Verified against the source: the frame-protocol types are confirmed under `src/MxGateway.Worker/Ipc/` and the tests under `src/MxGateway.Worker.Tests/Ipc/`, so the original doc was wrong on both project and component. Fenced code blocks were also relabelled `powershell` (the build/test commands are run from PowerShell on this Windows dev box).
|
||||
|
||||
### Worker.Tests-008
|
||||
|
||||
|
||||
+22
-3
@@ -44,9 +44,22 @@ skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
|
||||
the installed MXAccess COM object and depends on live provider state.
|
||||
|
||||
The live smoke opens a gateway session, launches the x86 worker, runs
|
||||
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
|
||||
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
|
||||
graceful shutdown request even when a command or event assertion fails.
|
||||
`Register`, `AddItem`, and `Advise`, waits a bounded time for the first
|
||||
`OnDataChange` event (skipping any earlier bootstrap/registration-state event),
|
||||
and closes the session in a `finally` block so the worker gets a graceful
|
||||
shutdown request even when a command or event assertion fails. Cleanup failures
|
||||
in that `finally` block are logged rather than thrown, so a real assertion
|
||||
failure is never masked by a shutdown timeout.
|
||||
|
||||
`WorkerLiveMxAccessSmokeTests` additionally covers two MXAccess parity paths the
|
||||
fake-worker tests cannot validate:
|
||||
|
||||
- a `Write` round-trip against an advised item, and
|
||||
- an `AddItem` against an invalid server handle, asserting the MXAccess failure
|
||||
surfaces in the command reply without faulting the gateway transport.
|
||||
|
||||
All three tests are gated by the same `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1`
|
||||
opt-in variable.
|
||||
|
||||
Build the worker before running the smoke:
|
||||
|
||||
@@ -119,6 +132,12 @@ GLAuth has only the baseline groups, so this is a hard prerequisite beyond "LDAP
|
||||
is up." See the "Adding a gw-specific group" section of `glauth.md` for the
|
||||
provisioning step that adds `GwAdmin` and grants it to `admin`.
|
||||
|
||||
The suite covers both the success path and the `DashboardAuthenticator` failure
|
||||
branches: `admin` in `GwAdmin` succeeds; `readonly` is denied for missing group;
|
||||
`admin` with a wrong password is rejected by the candidate bind without leaking
|
||||
the password into `FailureMessage`; an unknown username yields no candidate; and
|
||||
an unreachable LDAP server is absorbed into a failed result rather than throwing.
|
||||
|
||||
Run the LDAP live tests explicitly:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -35,17 +35,22 @@ oversized frames, protocol version mismatches, and session mismatches.
|
||||
|
||||
## Verification
|
||||
|
||||
The frame protocol lives in `MxGateway.Worker.Ipc` (`WorkerFrameReader`,
|
||||
`WorkerFrameWriter`, `WorkerFrameProtocolOptions`) and is covered by
|
||||
`src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs`. The worker is an
|
||||
x86 process, so build and test it with `-p:Platform=x86`.
|
||||
|
||||
Run the focused tests after changing the frame protocol:
|
||||
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
|
||||
```powershell
|
||||
dotnet test src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter WorkerFrameProtocolTests
|
||||
```
|
||||
|
||||
Run the gateway build because the frame protocol is part of
|
||||
`MxGateway.Server`:
|
||||
Run the x86 worker build because the frame protocol is part of
|
||||
`MxGateway.Worker`:
|
||||
|
||||
```bash
|
||||
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
|
||||
```powershell
|
||||
dotnet build src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86
|
||||
```
|
||||
|
||||
## Related Documentation
|
||||
|
||||
@@ -43,6 +43,69 @@ public sealed class DashboardLdapLiveTests
|
||||
Assert.DoesNotContain("readonly123", result.FailureMessage, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword()
|
||||
{
|
||||
// Exercises the LdapException branch: the user exists and the service
|
||||
// account search succeeds, but the candidate bind is rejected.
|
||||
const string wrongPassword = "definitely-not-the-admin-password";
|
||||
DashboardAuthenticator authenticator = CreateAuthenticator();
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"admin",
|
||||
wrongPassword,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
Assert.DoesNotContain(wrongPassword, result.FailureMessage, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_UnknownUsername_Fails()
|
||||
{
|
||||
// Exercises the `candidate is null` branch: the service-account search
|
||||
// returns no entry, so no candidate bind is attempted.
|
||||
DashboardAuthenticator authenticator = CreateAuthenticator();
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"no-such-user-9f3c1",
|
||||
"irrelevant-password",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing()
|
||||
{
|
||||
// Exercises the connect-failure path: a closed loopback port produces a
|
||||
// connection error that DashboardAuthenticator must absorb into a Fail
|
||||
// result rather than propagating an exception to the dashboard.
|
||||
DashboardAuthenticator authenticator = new(
|
||||
Options.Create(new GatewayOptions
|
||||
{
|
||||
Ldap = new LdapOptions
|
||||
{
|
||||
// 1 is a reserved port number that no LDAP server listens on.
|
||||
Port = 1,
|
||||
},
|
||||
}),
|
||||
NullLogger<DashboardAuthenticator>.Instance);
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"admin",
|
||||
"admin123",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
}
|
||||
|
||||
private static DashboardAuthenticator CreateAuthenticator()
|
||||
{
|
||||
return new DashboardAuthenticator(
|
||||
|
||||
@@ -86,8 +86,15 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
LogReply("Advise", adviseReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
|
||||
// A live MXAccess provider can deliver an initial registration-state
|
||||
// or bad-quality bootstrap event before the OnDataChange the worker
|
||||
// is contracted to emit. Match on the family rather than trusting
|
||||
// whatever event arrives first so a genuine ordering defect cannot
|
||||
// pass spuriously or leave a later wrong event unverified.
|
||||
MxEvent dataChange = await eventWriter
|
||||
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||
.WaitForMessageAsync(
|
||||
candidate => candidate.Family == MxEventFamily.OnDataChange,
|
||||
IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||
.ConfigureAwait(false);
|
||||
LogEvent(dataChange);
|
||||
|
||||
@@ -98,22 +105,184 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||
}
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (streamTask is not null)
|
||||
/// <summary>
|
||||
/// Verifies that a Write command round-trips through live MXAccess against an advised item.
|
||||
/// </summary>
|
||||
[LiveMxAccessFact]
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public async Task GatewaySession_WithLiveWorker_WritesValueToAdvisedItem()
|
||||
{
|
||||
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||
Assert.True(
|
||||
File.Exists(workerExecutablePath),
|
||||
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||
|
||||
TestWorkerProcessFactory processFactory = new(output);
|
||||
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||
|
||||
string? sessionId = null;
|
||||
Task? streamTask = null;
|
||||
|
||||
try
|
||||
{
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
ClientSessionName = "live-mxaccess-write",
|
||||
ClientCorrelationId = "live-open-write",
|
||||
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
sessionId = openReply.SessionId;
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
|
||||
RecordingServerStreamWriter<MxEvent> eventWriter = new();
|
||||
streamTask = fixture.Service.StreamEvents(
|
||||
new StreamEventsRequest { SessionId = sessionId },
|
||||
eventWriter,
|
||||
new TestServerCallContext());
|
||||
|
||||
MxCommandReply registerReply = await fixture.Service.Invoke(
|
||||
CreateRegisterRequest(sessionId),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Register", registerReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
|
||||
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("AddItem", addItemReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.True(addItemReply.AddItem.ItemHandle > 0);
|
||||
|
||||
MxCommandReply adviseReply = await fixture.Service.Invoke(
|
||||
CreateAdviseRequest(
|
||||
sessionId,
|
||||
registerReply.Register.ServerHandle,
|
||||
addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Advise", adviseReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
|
||||
MxCommandReply writeReply = await fixture.Service.Invoke(
|
||||
CreateWriteRequest(
|
||||
sessionId,
|
||||
registerReply.Register.ServerHandle,
|
||||
addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Write", writeReply);
|
||||
|
||||
// The gateway must always report a protocol-level status. MXAccess
|
||||
// parity details (a write rejection, a secured-item failure) belong
|
||||
// in hresult / statuses, not in a transport failure — the command
|
||||
// itself completed its round-trip to the worker and back.
|
||||
Assert.Equal(ProtocolStatusCode.Ok, writeReply.ProtocolStatus.Code);
|
||||
Assert.Equal(MxCommandKind.Write, writeReply.Kind);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that an AddItem against an invalid server handle surfaces the MXAccess failure
|
||||
/// without faulting the gateway transport, exercising the invalid-handle parity path.
|
||||
/// </summary>
|
||||
[LiveMxAccessFact]
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public async Task GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault()
|
||||
{
|
||||
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||
Assert.True(
|
||||
File.Exists(workerExecutablePath),
|
||||
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||
|
||||
TestWorkerProcessFactory processFactory = new(output);
|
||||
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||
|
||||
string? sessionId = null;
|
||||
|
||||
try
|
||||
{
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
ClientSessionName = "live-mxaccess-invalid-handle",
|
||||
ClientCorrelationId = "live-open-invalid",
|
||||
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
sessionId = openReply.SessionId;
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
|
||||
// Deliberately skip Register: server handle 0x7FFFFFFF was never
|
||||
// issued by MXAccess. The worker must invoke COM and relay the
|
||||
// invalid-handle failure rather than the gateway short-circuiting.
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(sessionId, serverHandle: int.MaxValue),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("AddItem(invalid-handle)", addItemReply);
|
||||
|
||||
// MXAccess parity: an invalid handle is an MXAccess-level failure.
|
||||
// The command still completed its worker round-trip, so the gateway
|
||||
// protocol status is Ok and the failure shows up in hresult / the
|
||||
// status proxies — it must not be reported as a transport fault.
|
||||
Assert.NotEqual(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.True(
|
||||
addItemReply.AddItem is null || addItemReply.AddItem.ItemHandle <= 0,
|
||||
"Invalid-handle AddItem must not yield a usable item handle.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Closes the session and drains the event stream / worker processes without letting a
|
||||
/// cleanup timeout mask the original failure from the test body.
|
||||
/// </summary>
|
||||
private async Task ShutDownAsync(
|
||||
GatewayServiceFixture fixture,
|
||||
TestWorkerProcessFactory processFactory,
|
||||
string? sessionId,
|
||||
Task? streamTask)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (streamTask is not null)
|
||||
{
|
||||
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Cleanup runs in a finally block. A TimeoutException (or a faulted
|
||||
// StreamEvents task) here would otherwise replace any assertion
|
||||
// failure raised in the try block. Log it and let the original
|
||||
// failure surface.
|
||||
output.WriteLine($"Cleanup error during session/stream shutdown: {ex}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
output.WriteLine($"Cleanup error while waiting for worker processes to exit: {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,6 +344,32 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateWriteRequest(
|
||||
string sessionId,
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-write",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write,
|
||||
Write = new WriteCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
Value = new MxValue
|
||||
{
|
||||
DataType = MxDataType.Integer,
|
||||
Int32Value = 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private async Task CloseSessionAsync(
|
||||
GatewayServiceFixture fixture,
|
||||
string sessionId)
|
||||
@@ -321,8 +516,8 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
|
||||
{
|
||||
private readonly object syncRoot = new();
|
||||
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly List<T> messages = [];
|
||||
private readonly SemaphoreSlim messageArrived = new(0);
|
||||
|
||||
/// <summary>
|
||||
/// All messages that have been written to the stream.
|
||||
@@ -344,7 +539,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
public WriteOptions? WriteOptions { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Records the message and completes the first-message task.
|
||||
/// Records the message and signals any pending waiter.
|
||||
/// </summary>
|
||||
/// <param name="message">The message to write.</param>
|
||||
public Task WriteAsync(T message)
|
||||
@@ -354,18 +549,51 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
messages.Add(message);
|
||||
}
|
||||
|
||||
firstMessage.TrySetResult(message);
|
||||
messageArrived.Release();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits for the first message up to the specified timeout.
|
||||
/// Waits for the first recorded message that satisfies <paramref name="predicate"/>,
|
||||
/// up to the specified timeout. Earlier non-matching messages (for example a
|
||||
/// registration-state bootstrap event) are skipped rather than treated as the result.
|
||||
/// </summary>
|
||||
/// <param name="timeout">The maximum time to wait.</param>
|
||||
/// <returns>The first message written to the stream.</returns>
|
||||
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
|
||||
/// <param name="predicate">Filter the awaited message must satisfy.</param>
|
||||
/// <param name="timeout">The maximum total time to wait.</param>
|
||||
/// <returns>The first message that satisfies the predicate.</returns>
|
||||
public async Task<T> WaitForMessageAsync(
|
||||
Func<T, bool> predicate,
|
||||
TimeSpan timeout)
|
||||
{
|
||||
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
|
||||
using CancellationTokenSource timeoutCancellation = new(timeout);
|
||||
int scanned = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
T[] snapshot;
|
||||
lock (syncRoot)
|
||||
{
|
||||
snapshot = messages.ToArray();
|
||||
}
|
||||
|
||||
for (; scanned < snapshot.Length; scanned++)
|
||||
{
|
||||
if (predicate(snapshot[scanned]))
|
||||
{
|
||||
return snapshot[scanned];
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await messageArrived.WaitAsync(timeoutCancellation.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCancellation.IsCancellationRequested)
|
||||
{
|
||||
throw new TimeoutException(
|
||||
$"No stream message satisfied the predicate within {timeout}. Recorded {scanned} message(s).");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,16 +110,21 @@ public sealed class FakeWorkerHarnessTests
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that sending a heartbeat updates the client heartbeat state.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that sending a heartbeat updates the client heartbeat state. Uses a
|
||||
/// <see cref="ManualTimeProvider"/> so the timestamp advance is deterministic rather
|
||||
/// than relying on a wall-clock <c>Task.Delay</c> exceeding clock resolution.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
|
||||
{
|
||||
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
|
||||
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
||||
await using WorkerClient client = fakeWorker.CreateClient();
|
||||
await using WorkerClient client = fakeWorker.CreateClient(timeProvider: clock);
|
||||
await StartClientAsync(fakeWorker, client);
|
||||
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
clock.Advance(TimeSpan.FromSeconds(1));
|
||||
await fakeWorker.SendHeartbeatAsync(
|
||||
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
|
||||
|
||||
@@ -128,6 +133,7 @@ public sealed class FakeWorkerHarnessTests
|
||||
TestTimeout);
|
||||
|
||||
Assert.Equal(WorkerClientState.Ready, client.State);
|
||||
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a hung worker times out pending command invocations.</summary>
|
||||
@@ -215,4 +221,17 @@ public sealed class FakeWorkerHarnessTests
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Time provider with a manually advanced clock for deterministic timestamp tests.</summary>
|
||||
private sealed class ManualTimeProvider(DateTimeOffset start) : TimeProvider
|
||||
{
|
||||
private DateTimeOffset _now = start;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override DateTimeOffset GetUtcNow() => _now;
|
||||
|
||||
/// <summary>Advances the manual clock by the given amount.</summary>
|
||||
/// <param name="delta">Amount of time to add to the current clock value.</param>
|
||||
public void Advance(TimeSpan delta) => _now += delta;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,9 +71,11 @@ public sealed class WorkerClientTests
|
||||
async () => await timedOutInvokeTask);
|
||||
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
|
||||
|
||||
// Send the stale reply for the already-timed-out command, then the second
|
||||
// command's reply. The pipe is FIFO, so the read loop processes (and discards)
|
||||
// the stale reply before the second reply — no fixed Task.Delay needed.
|
||||
await pipePair.WorkerWriter.WriteAsync(
|
||||
CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping));
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(50));
|
||||
|
||||
Task<WorkerCommandReply> secondInvokeTask = client.InvokeAsync(
|
||||
CreateCommand(MxCommandKind.GetWorkerInfo),
|
||||
@@ -142,7 +144,14 @@ public sealed class WorkerClientTests
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that the read loop faults the client when the pipe disconnects.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that when the client faults it kills the owned worker process.
|
||||
/// The assertion waits on <see cref="FakeWorkerProcess.WaitForExitAsync"/>, which
|
||||
/// completes exactly when <c>Kill</c> runs, instead of polling <c>client.State</c>.
|
||||
/// Polling state is racy: <see cref="WorkerClient.SetFaulted"/> publishes the
|
||||
/// <c>Faulted</c> state before it calls <c>KillOwnedProcess</c>, so a state-based
|
||||
/// wait can observe <c>Faulted</c> while <c>KillCount</c> is still 0.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess()
|
||||
{
|
||||
@@ -164,15 +173,77 @@ public sealed class WorkerClientTests
|
||||
await pipePair.WorkerWriter.WriteAsync(
|
||||
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
|
||||
|
||||
await WaitUntilAsync(
|
||||
() => client.State == WorkerClientState.Faulted,
|
||||
TestTimeout);
|
||||
// Deterministic: this completes the instant Kill() runs, with no timing window.
|
||||
using CancellationTokenSource exitTimeout = new(TestTimeout);
|
||||
await process.WaitForExitAsync(exitTimeout.Token);
|
||||
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
Assert.Equal(1, process.KillCount);
|
||||
Assert.True(process.KillEntireProcessTree);
|
||||
Assert.True(process.HasExited);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a worker faulting mid-command — the pipe dropping while an
|
||||
/// <see cref="WorkerClient.InvokeAsync"/> is still pending — completes the pending
|
||||
/// invoke task with a <see cref="WorkerClientException"/> carrying the
|
||||
/// pipe-disconnected error code rather than hanging until the command timeout.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected()
|
||||
{
|
||||
await using PipePair pipePair = await PipePair.CreateAsync();
|
||||
await using WorkerClient client = CreateClient(pipePair);
|
||||
await CompleteHandshakeAsync(client, pipePair);
|
||||
|
||||
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
TestTimeout,
|
||||
CancellationToken.None);
|
||||
|
||||
// The worker received the command but disconnects before replying.
|
||||
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
|
||||
await pipePair.DisposeWorkerSideAsync();
|
||||
|
||||
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
|
||||
async () => await invokeTask.WaitAsync(TestTimeout));
|
||||
|
||||
Assert.Equal(WorkerClientErrorCode.PipeDisconnected, exception.ErrorCode);
|
||||
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a worker emitting a <c>WorkerFault</c> envelope while an
|
||||
/// <see cref="WorkerClient.InvokeAsync"/> is pending completes the pending invoke
|
||||
/// task with a <see cref="WorkerClientException"/> carrying the worker-faulted
|
||||
/// error code.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted()
|
||||
{
|
||||
await using PipePair pipePair = await PipePair.CreateAsync();
|
||||
await using WorkerClient client = CreateClient(pipePair);
|
||||
await CompleteHandshakeAsync(client, pipePair);
|
||||
|
||||
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
TestTimeout,
|
||||
CancellationToken.None);
|
||||
|
||||
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
|
||||
await pipePair.WorkerWriter.WriteAsync(CreateWorkerFaultEnvelope("scripted mid-command fault"));
|
||||
|
||||
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
|
||||
async () => await invokeTask.WaitAsync(TestTimeout));
|
||||
|
||||
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
|
||||
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
|
||||
{
|
||||
@@ -244,15 +315,22 @@ public sealed class WorkerClientTests
|
||||
Assert.True(process.Disposed);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a heartbeat envelope updates the last-heartbeat timestamp and worker
|
||||
/// process id. Uses a <see cref="ManualTimeProvider"/> so the timestamp advance is
|
||||
/// deterministic instead of relying on a wall-clock <c>Task.Delay</c> exceeding
|
||||
/// <see cref="DateTimeOffset.UtcNow"/> resolution.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
|
||||
{
|
||||
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
|
||||
await using PipePair pipePair = await PipePair.CreateAsync();
|
||||
await using WorkerClient client = CreateClient(pipePair);
|
||||
await using WorkerClient client = CreateClient(pipePair, timeProvider: clock);
|
||||
await CompleteHandshakeAsync(client, pipePair);
|
||||
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
clock.Advance(TimeSpan.FromSeconds(1));
|
||||
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
|
||||
|
||||
await WaitUntilAsync(
|
||||
@@ -260,6 +338,7 @@ public sealed class WorkerClientTests
|
||||
TestTimeout);
|
||||
|
||||
Assert.Equal(WorkerClientState.Ready, client.State);
|
||||
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that the heartbeat monitor faults the client when the heartbeat expires.</summary>
|
||||
@@ -288,7 +367,8 @@ public sealed class WorkerClientTests
|
||||
PipePair pipePair,
|
||||
WorkerClientOptions? options = null,
|
||||
GatewayMetrics? metrics = null,
|
||||
WorkerProcessHandle? processHandle = null)
|
||||
WorkerProcessHandle? processHandle = null,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
WorkerFrameProtocolOptions frameOptions = new(SessionId);
|
||||
WorkerClientConnection connection = new(
|
||||
@@ -298,7 +378,7 @@ public sealed class WorkerClientTests
|
||||
frameOptions,
|
||||
processHandle);
|
||||
|
||||
return new WorkerClient(connection, options, metrics);
|
||||
return new WorkerClient(connection, options, metrics, timeProvider);
|
||||
}
|
||||
|
||||
private static WorkerProcessHandle CreateProcessHandle(FakeWorkerProcess process)
|
||||
@@ -399,6 +479,23 @@ public sealed class WorkerClientTests
|
||||
});
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateWorkerFaultEnvelope(string diagnosticMessage)
|
||||
{
|
||||
return CreateWorkerEnvelope(
|
||||
correlationId: string.Empty,
|
||||
sequence: 30,
|
||||
envelope => envelope.WorkerFault = new WorkerFault
|
||||
{
|
||||
Category = WorkerFaultCategory.MxaccessCommandFailed,
|
||||
DiagnosticMessage = diagnosticMessage,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.WorkerUnavailable,
|
||||
Message = diagnosticMessage,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
|
||||
{
|
||||
return CreateWorkerEnvelope(
|
||||
@@ -509,6 +606,19 @@ public sealed class WorkerClientTests
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Time provider with a manually advanced clock for deterministic timestamp tests.</summary>
|
||||
private sealed class ManualTimeProvider(DateTimeOffset start) : TimeProvider
|
||||
{
|
||||
private DateTimeOffset _now = start;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override DateTimeOffset GetUtcNow() => _now;
|
||||
|
||||
/// <summary>Advances the manual clock by the given amount.</summary>
|
||||
/// <param name="delta">Amount of time to add to the current clock value.</param>
|
||||
public void Advance(TimeSpan delta) => _now += delta;
|
||||
}
|
||||
|
||||
private sealed class FakeWorkerProcess : IWorkerProcess
|
||||
{
|
||||
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
@@ -6,8 +6,9 @@ using MxGateway.Server.Security.Authentication;
|
||||
|
||||
namespace MxGateway.Tests.Security.Authentication;
|
||||
|
||||
public sealed class ApiKeyAdminCliRunnerTests
|
||||
public sealed class ApiKeyAdminCliRunnerTests : IDisposable
|
||||
{
|
||||
private readonly List<TempDatabaseDirectory> _tempDirectories = [];
|
||||
/// <summary>Verifies that CreateKeyAsync creates an authenticating key and audits the action.</summary>
|
||||
[Fact]
|
||||
public async Task CreateKeyAsync_CreatesAuthenticatingKeyAndAudits()
|
||||
@@ -249,12 +250,23 @@ public sealed class ApiKeyAdminCliRunnerTests
|
||||
return services.BuildServiceProvider(validateScopes: true);
|
||||
}
|
||||
|
||||
private static string CreateTempDatabasePath()
|
||||
/// <summary>Clears SQLite pools and deletes every temporary directory created by this test.</summary>
|
||||
public void Dispose()
|
||||
{
|
||||
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-cli-tests", Guid.NewGuid().ToString("N"));
|
||||
Directory.CreateDirectory(directory);
|
||||
foreach (TempDatabaseDirectory directory in _tempDirectories)
|
||||
{
|
||||
directory.Dispose();
|
||||
}
|
||||
|
||||
return Path.Combine(directory, "gateway-auth.db");
|
||||
_tempDirectories.Clear();
|
||||
}
|
||||
|
||||
private string CreateTempDatabasePath()
|
||||
{
|
||||
TempDatabaseDirectory directory = TempDatabaseDirectory.Create("mxgateway-auth-cli-tests");
|
||||
_tempDirectories.Add(directory);
|
||||
|
||||
return directory.DatabasePath();
|
||||
}
|
||||
|
||||
private static string ReadApiKey(string json)
|
||||
|
||||
@@ -11,8 +11,9 @@ namespace MxGateway.Tests.Security.Authentication;
|
||||
/// <summary>
|
||||
/// Tests for <see cref="SqliteAuthStore"/>.
|
||||
/// </summary>
|
||||
public sealed class SqliteAuthStoreTests
|
||||
public sealed class SqliteAuthStoreTests : IDisposable
|
||||
{
|
||||
private readonly List<TempDatabaseDirectory> _tempDirectories = [];
|
||||
/// <summary>
|
||||
/// Verifies that MigrateAsync initializes the database schema.
|
||||
/// </summary>
|
||||
@@ -167,12 +168,23 @@ public sealed class SqliteAuthStoreTests
|
||||
return services.BuildServiceProvider(validateScopes: true);
|
||||
}
|
||||
|
||||
private static string CreateTempDatabasePath()
|
||||
/// <summary>Clears SQLite pools and deletes every temporary directory created by this test.</summary>
|
||||
public void Dispose()
|
||||
{
|
||||
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-tests", Guid.NewGuid().ToString("N"));
|
||||
Directory.CreateDirectory(directory);
|
||||
foreach (TempDatabaseDirectory directory in _tempDirectories)
|
||||
{
|
||||
directory.Dispose();
|
||||
}
|
||||
|
||||
return Path.Combine(directory, "gateway-auth.db");
|
||||
_tempDirectories.Clear();
|
||||
}
|
||||
|
||||
private string CreateTempDatabasePath()
|
||||
{
|
||||
TempDatabaseDirectory directory = TempDatabaseDirectory.Create("mxgateway-auth-tests");
|
||||
_tempDirectories.Add(directory);
|
||||
|
||||
return directory.DatabasePath();
|
||||
}
|
||||
|
||||
private static async Task CreateVersionZeroDatabaseAsync(string databasePath)
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
|
||||
namespace MxGateway.Tests.Security.Authentication;
|
||||
|
||||
/// <summary>
|
||||
/// Disposable temporary directory for SQLite auth-store tests. Each instance owns a
|
||||
/// unique directory under <c>%TEMP%</c>; <see cref="Dispose"/> clears SQLite connection
|
||||
/// pools (which otherwise keep the <c>.db</c> file handle open) and deletes the directory
|
||||
/// so test runs do not leak temp files or open handles.
|
||||
/// </summary>
|
||||
internal sealed class TempDatabaseDirectory : IDisposable
|
||||
{
|
||||
private bool _disposed;
|
||||
|
||||
private TempDatabaseDirectory(string path)
|
||||
{
|
||||
Path = path;
|
||||
}
|
||||
|
||||
/// <summary>Gets the path to the temporary directory.</summary>
|
||||
public string Path { get; }
|
||||
|
||||
/// <summary>Creates a new uniquely named temporary directory under the given prefix.</summary>
|
||||
/// <param name="prefix">Folder name placed under <c>%TEMP%</c> to group related test directories.</param>
|
||||
public static TempDatabaseDirectory Create(string prefix)
|
||||
{
|
||||
string path = System.IO.Path.Combine(
|
||||
System.IO.Path.GetTempPath(),
|
||||
prefix,
|
||||
Guid.NewGuid().ToString("N"));
|
||||
Directory.CreateDirectory(path);
|
||||
|
||||
return new TempDatabaseDirectory(path);
|
||||
}
|
||||
|
||||
/// <summary>Returns a database file path inside this temporary directory.</summary>
|
||||
/// <param name="fileName">Database file name; defaults to the gateway auth database name.</param>
|
||||
public string DatabasePath(string fileName = "gateway-auth.db")
|
||||
{
|
||||
return System.IO.Path.Combine(Path, fileName);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
|
||||
// Microsoft.Data.Sqlite pools connections by default; clear the pools so the
|
||||
// underlying file handle is released before the directory is deleted.
|
||||
SqliteConnection.ClearAllPools();
|
||||
|
||||
try
|
||||
{
|
||||
if (Directory.Exists(Path))
|
||||
{
|
||||
Directory.Delete(Path, recursive: true);
|
||||
}
|
||||
}
|
||||
catch (IOException)
|
||||
{
|
||||
// Best-effort cleanup; a transient handle should not fail the test.
|
||||
}
|
||||
catch (UnauthorizedAccessException)
|
||||
{
|
||||
// Best-effort cleanup; a transient handle should not fail the test.
|
||||
}
|
||||
}
|
||||
}
|
||||
+242
@@ -1,9 +1,15 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Grpc;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Security.Authentication;
|
||||
using MxGateway.Server.Security.Authorization;
|
||||
using MxGateway.Server.Sessions;
|
||||
|
||||
namespace MxGateway.Tests.Security.Authorization;
|
||||
|
||||
@@ -156,6 +162,110 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
Assert.Null(identityAccessor.Current);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end composition test: runs an <c>OpenSession</c> call through the real
|
||||
/// interceptor in front of the real <see cref="MxAccessGatewayService"/> with a key
|
||||
/// that lacks the <c>session:open</c> scope, and asserts the interceptor denies the
|
||||
/// call with <see cref="StatusCode.PermissionDenied"/> before the service runs.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InterceptorComposedWithService_OpenSessionMissingScope_DeniesBeforeServiceRuns()
|
||||
{
|
||||
GatewayRequestIdentityAccessor identityAccessor = new();
|
||||
RecordingSessionManager sessionManager = new();
|
||||
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
|
||||
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
|
||||
identityAccessor);
|
||||
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
() => interceptor.UnaryServerHandler(
|
||||
new OpenSessionRequest { ClientSessionName = "operator-session" },
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(request, context) => service.OpenSession(request, context)));
|
||||
|
||||
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
|
||||
Assert.Contains(GatewayScopes.SessionOpen, exception.Status.Detail, StringComparison.Ordinal);
|
||||
Assert.Equal(0, sessionManager.OpenSessionCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end composition test: runs an <c>OpenSession</c> call through the real
|
||||
/// interceptor in front of the real <see cref="MxAccessGatewayService"/> with a key
|
||||
/// that holds <c>session:open</c>, and asserts the service runs and observes the
|
||||
/// interceptor-supplied identity.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InterceptorComposedWithService_OpenSessionWithScope_RunsServiceWithIdentity()
|
||||
{
|
||||
GatewayRequestIdentityAccessor identityAccessor = new();
|
||||
RecordingSessionManager sessionManager = new();
|
||||
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
|
||||
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.SessionOpen)),
|
||||
identityAccessor);
|
||||
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
|
||||
|
||||
OpenSessionReply reply = await interceptor.UnaryServerHandler(
|
||||
new OpenSessionRequest { ClientSessionName = "operator-session" },
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(request, context) => service.OpenSession(request, context));
|
||||
|
||||
Assert.Equal("session-1", reply.SessionId);
|
||||
Assert.Equal(1, sessionManager.OpenSessionCount);
|
||||
Assert.Equal("Operator Key", sessionManager.LastClientIdentity);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end composition test: an <c>Invoke</c> call through the real interceptor in
|
||||
/// front of the real service with a key holding only <c>invoke:read</c> is denied
|
||||
/// because the wrapped command is a write, confirming command-scope mapping is
|
||||
/// enforced through the full composition.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InterceptorComposedWithService_InvokeWriteCommandWithReadScope_DeniesBeforeServiceRuns()
|
||||
{
|
||||
GatewayRequestIdentityAccessor identityAccessor = new();
|
||||
RecordingSessionManager sessionManager = new();
|
||||
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
|
||||
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.InvokeRead)),
|
||||
identityAccessor);
|
||||
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
|
||||
MxCommandRequest request = new()
|
||||
{
|
||||
SessionId = "session-1",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write,
|
||||
Write = new WriteCommand { ServerHandle = 1, ItemHandle = 2 },
|
||||
},
|
||||
};
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
() => interceptor.UnaryServerHandler(
|
||||
request,
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(req, context) => service.Invoke(req, context)));
|
||||
|
||||
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
|
||||
Assert.Contains(GatewayScopes.InvokeWrite, exception.Status.Detail, StringComparison.Ordinal);
|
||||
Assert.Equal(0, sessionManager.InvokeCount);
|
||||
}
|
||||
|
||||
private static MxAccessGatewayService CreateService(
|
||||
ISessionManager sessionManager,
|
||||
IGatewayRequestIdentityAccessor identityAccessor)
|
||||
{
|
||||
return new MxAccessGatewayService(
|
||||
sessionManager,
|
||||
identityAccessor,
|
||||
new AllowAllConstraintEnforcer(),
|
||||
new MxAccessGrpcRequestValidator(),
|
||||
new MxAccessGrpcMapper(),
|
||||
new NoOpEventStreamService(),
|
||||
new GatewayMetrics(),
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
}
|
||||
|
||||
private static GatewayGrpcAuthorizationInterceptor CreateInterceptor(
|
||||
IApiKeyVerifier apiKeyVerifier,
|
||||
IGatewayRequestIdentityAccessor identityAccessor,
|
||||
@@ -188,6 +298,138 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
return new TestServerCallContext([new Metadata.Entry("authorization", authorizationHeader)]);
|
||||
}
|
||||
|
||||
/// <summary>Records whether the gateway service ran past the interceptor for composition tests.</summary>
|
||||
private sealed class RecordingSessionManager : ISessionManager
|
||||
{
|
||||
/// <summary>Gets the number of times OpenSessionAsync was invoked.</summary>
|
||||
public int OpenSessionCount { get; private set; }
|
||||
|
||||
/// <summary>Gets the number of times InvokeAsync was invoked.</summary>
|
||||
public int InvokeCount { get; private set; }
|
||||
|
||||
/// <summary>Gets the last client identity passed to OpenSessionAsync.</summary>
|
||||
public string? LastClientIdentity { get; private set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
OpenSessionCount++;
|
||||
LastClientIdentity = clientIdentity;
|
||||
|
||||
GatewaySession session = new(
|
||||
"session-1",
|
||||
GatewayContractInfo.DefaultBackendName,
|
||||
"pipe",
|
||||
"nonce",
|
||||
clientIdentity ?? "client",
|
||||
"client-session",
|
||||
"client-correlation",
|
||||
TimeSpan.FromSeconds(7),
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(10),
|
||||
DateTimeOffset.UtcNow);
|
||||
|
||||
return Task.FromResult(session);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool TryGetSession(string sessionId, out GatewaySession session)
|
||||
{
|
||||
session = null!;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
string sessionId,
|
||||
WorkerCommand command,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
InvokeCount++;
|
||||
return Task.FromResult(new WorkerCommandReply());
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
string sessionId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return AsyncEnumerable.Empty<WorkerEvent>();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<SessionCloseResult> CloseSessionAsync(
|
||||
string sessionId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<int> CloseExpiredLeasesAsync(
|
||||
DateTimeOffset now,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Event stream service that yields nothing; alarm/event RPCs are not under test here.</summary>
|
||||
private sealed class NoOpEventStreamService : IEventStreamService
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Constraint enforcer that permits every operation for composition tests.</summary>
|
||||
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task<ConstraintFailure?> CheckReadTagAsync(
|
||||
ApiKeyIdentity? identity,
|
||||
string tagAddress,
|
||||
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<ConstraintFailure?> CheckReadHandleAsync(
|
||||
ApiKeyIdentity? identity,
|
||||
GatewaySession session,
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<ConstraintFailure?> CheckWriteHandleAsync(
|
||||
ApiKeyIdentity? identity,
|
||||
GatewaySession session,
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task RecordDenialAsync(
|
||||
ApiKeyIdentity? identity,
|
||||
string commandKind,
|
||||
string target,
|
||||
ConstraintFailure failure,
|
||||
CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed class FakeApiKeyVerifier(ApiKeyVerificationResult result) : IApiKeyVerifier
|
||||
{
|
||||
/// <summary>Gets whether the verifier was called.</summary>
|
||||
|
||||
@@ -61,4 +61,42 @@ public sealed class GatewayGrpcScopeResolverTests
|
||||
|
||||
Assert.Equal(expectedScope, scope);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that an unmapped request type fails closed: the resolver returns the
|
||||
/// most-restrictive <see cref="GatewayScopes.Admin"/> scope rather than a permissive
|
||||
/// default, so a newly added RPC that is never mapped is denied to ordinary keys.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void ResolveRequiredScope_UnmappedRequestType_FailsClosedToAdminScope()
|
||||
{
|
||||
GatewayGrpcScopeResolver resolver = new();
|
||||
|
||||
string scope = resolver.ResolveRequiredScope(new UnmappedRequest());
|
||||
|
||||
Assert.Equal(GatewayScopes.Admin, scope);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that an <see cref="MxCommandRequest"/> with an unrecognized command kind
|
||||
/// resolves to the read scope rather than silently granting write or admin access.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void ResolveRequiredScope_UnknownInvokeCommandKind_ReturnsInvokeReadScope()
|
||||
{
|
||||
GatewayGrpcScopeResolver resolver = new();
|
||||
|
||||
string scope = resolver.ResolveRequiredScope(new MxCommandRequest
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = (MxCommandKind)9999,
|
||||
},
|
||||
});
|
||||
|
||||
Assert.Equal(GatewayScopes.InvokeRead, scope);
|
||||
}
|
||||
|
||||
/// <summary>Request type intentionally not mapped by the scope resolver.</summary>
|
||||
private sealed class UnmappedRequest;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
@@ -19,7 +18,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new();
|
||||
using MemoryStream stream = new();
|
||||
WorkerEnvelope original = CreateGatewayHelloEnvelope();
|
||||
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
@@ -39,7 +38,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||
envelope.ProtocolVersion++;
|
||||
MemoryStream stream = new(CreateFrame(envelope));
|
||||
using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -56,7 +55,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||
envelope.SessionId = "different-session";
|
||||
MemoryStream stream = new(CreateFrame(envelope));
|
||||
using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -71,7 +70,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task ReadAsync_WithMalformedLength_ThrowsMalformedLength()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||
using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -86,7 +85,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
|
||||
using MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -101,7 +100,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task WriteAsync_WithConcurrentCalls_SerializesCompleteFrames()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new();
|
||||
using MemoryStream stream = new();
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
|
||||
await Task.WhenAll(
|
||||
|
||||
@@ -24,10 +24,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithValidGatewayHello_SendsHelloThenReady()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -55,10 +55,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithWrongNonce_FaultsBeforeInitialization()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(nonce: "wrong"));
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -83,10 +83,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithWrongProtocol_FaultsBeforeInitialization()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(supportedProtocolVersion: 999));
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -110,8 +110,8 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithMalformedFrame_WritesWorkerFault()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -137,10 +137,10 @@ public sealed class WorkerPipeSessionTests
|
||||
{
|
||||
const int hresult = unchecked((int)0x80040154);
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
|
||||
await Assert.ThrowsAsync<COMException>(
|
||||
|
||||
@@ -293,7 +293,11 @@ public sealed class MxAccessStaSessionTests
|
||||
|
||||
/// <summary>
|
||||
/// Gap 2: Verifies that the STA poll loop stops when the session is disposed —
|
||||
/// no further PollOnce calls after disposal.
|
||||
/// no further PollOnce calls after disposal. <see cref="MxAccessStaSession.Dispose"/>
|
||||
/// joins the poll task before returning, so once Dispose returns no PollOnce
|
||||
/// call can still be in flight. The test asserts the poll count is frozen
|
||||
/// immediately after Dispose and stays frozen — deterministic, with no
|
||||
/// elapsed-time "no further polls" window that a slow agent could race.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Dispose_StopsAlarmPollLoop()
|
||||
@@ -302,7 +306,11 @@ public sealed class MxAccessStaSessionTests
|
||||
FakeMxAccessComObjectFactory factory = new();
|
||||
FakeMxAccessEventSink eventSink = new();
|
||||
using StaRuntime runtime = CreateRuntime();
|
||||
MxAccessStaSession session = new(
|
||||
// using declaration: if an assertion below throws before the explicit
|
||||
// Dispose, the session (its STA poll loop and alarm handler) is still
|
||||
// torn down. Dispose is idempotent, so the explicit call mid-test and
|
||||
// the using-scope call do not conflict.
|
||||
using MxAccessStaSession session = new(
|
||||
runtime,
|
||||
factory,
|
||||
eventSink,
|
||||
@@ -320,11 +328,15 @@ public sealed class MxAccessStaSessionTests
|
||||
|
||||
Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose.");
|
||||
|
||||
// Dispose joins the poll task; when it returns the loop has stopped
|
||||
// and no PollOnce call is still running.
|
||||
session.Dispose();
|
||||
int pollCountAtDispose = handler.PollCount;
|
||||
|
||||
// Wait 1 second and verify no further polls occur.
|
||||
await Task.Delay(1000);
|
||||
// The count is already frozen — re-reading after a yield must not
|
||||
// observe any further poll. This is a deterministic check, not a
|
||||
// timing window: a poll cannot start once the joined loop has exited.
|
||||
await Task.Yield();
|
||||
Assert.Equal(pollCountAtDispose, handler.PollCount);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Worker.Sta;
|
||||
@@ -27,7 +26,15 @@ public sealed class StaRuntimeTests
|
||||
Assert.Equal(ApartmentState.STA, observation.ApartmentState);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that InvokeAsync wakes the idle pump when a command is queued.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that InvokeAsync wakes the idle pump when a command is queued.
|
||||
/// The pump is configured with a 30-second idle period — far longer than
|
||||
/// any reasonable test run — so the awaited command completing at all proves
|
||||
/// the command wake event (not the idle pump tick) drove the dispatch. No
|
||||
/// wall-clock assertion is used: a loaded CI agent can stall an otherwise
|
||||
/// correct dispatch past an arbitrary millisecond budget, which would be a
|
||||
/// false failure.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WakesIdlePumpForQueuedCommand()
|
||||
{
|
||||
@@ -37,15 +44,10 @@ public sealed class StaRuntimeTests
|
||||
new StaMessagePump(),
|
||||
TimeSpan.FromSeconds(30));
|
||||
runtime.Start();
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
|
||||
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
|
||||
|
||||
stopwatch.Stop();
|
||||
Assert.Equal(runtime.StaThreadId, threadId);
|
||||
Assert.True(
|
||||
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
|
||||
$"Command took {stopwatch.Elapsed} to execute, so the command wake event did not wake the STA promptly.");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that Shutdown stops the thread and uninitializes the COM apartment.</summary>
|
||||
|
||||
@@ -580,13 +580,27 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
||||
|
||||
RequestShutdown();
|
||||
|
||||
// Cancel and discard the STA poll loop.
|
||||
// Cancel the STA poll loop and join it before disposing the alarm
|
||||
// handler. Joining (rather than discarding alarmPollTask) makes the
|
||||
// stop deterministic: once Dispose returns, no further PollOnce calls
|
||||
// can be in flight, so callers and tests can rely on a frozen poll
|
||||
// count instead of an elapsed-time "no further polls" window.
|
||||
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
|
||||
Task? pollTaskToJoin = alarmPollTask;
|
||||
alarmPollCts = null;
|
||||
alarmPollTask = null;
|
||||
if (pollCtsToDispose is not null)
|
||||
{
|
||||
try { pollCtsToDispose.Cancel(); } catch { }
|
||||
if (pollTaskToJoin is not null)
|
||||
{
|
||||
try
|
||||
{
|
||||
pollTaskToJoin.Wait(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
catch (AggregateException) { }
|
||||
catch (ObjectDisposedException) { }
|
||||
}
|
||||
try { pollCtsToDispose.Dispose(); } catch { }
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user