Compare commits
4 Commits
430187c28b
...
581b541801
| Author | SHA1 | Date | |
|---|---|---|---|
| 581b541801 | |||
| d3cb311aae | |||
| 186d03e5cc | |||
| 6bae5ea3a3 |
@@ -179,8 +179,8 @@ gradle :zb-mom-ww-mxgateway-cli:run --args="add-item --endpoint localhost:5000 -
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="advise --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="write --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-events --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --alarm-reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --filter-prefix Galaxy --limit 1 --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json"
|
||||
gradle :zb-mom-ww-mxgateway-cli:run --args="smoke --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --item TestObject.TestInt --json"
|
||||
```
|
||||
|
||||
|
||||
+138
-4
@@ -33,6 +33,7 @@ import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||
@@ -119,7 +120,7 @@ public final class MxGatewayCli implements Callable<Integer> {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
|
||||
static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
|
||||
CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory));
|
||||
commandLine.addSubcommand("version", new VersionCommand());
|
||||
commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory));
|
||||
@@ -154,6 +155,120 @@ public final class MxGatewayCli implements Callable<Integer> {
|
||||
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
|
||||
private static final Object ALARM_FEED_END = new Object();
|
||||
|
||||
/**
|
||||
* Tokenises a single batch-mode stdin line into the argv that the inner
|
||||
* {@link CommandLine} should execute. Honours single-quoted, double-quoted,
|
||||
* and backslash-escaped runs so values that contain spaces (e.g.
|
||||
* {@code --comment "needs verification"}) survive intact — the old
|
||||
* implementation used {@code split("\\s+")} which shredded any quoted
|
||||
* argument mid-string (Client.Java-034).
|
||||
*
|
||||
* <p>Rules (a small POSIX-like shell tokenizer; no variable expansion,
|
||||
* command substitution, globbing, or backtick handling):
|
||||
*
|
||||
* <ul>
|
||||
* <li>Outside quotes, runs of whitespace separate tokens.</li>
|
||||
* <li>{@code "..."} groups a sequence into one token; the surrounding
|
||||
* quotes are removed. Inside double quotes a backslash escapes
|
||||
* {@code \\}, {@code "}, and a literal newline; other characters
|
||||
* are taken literally (so {@code \n} is the two characters
|
||||
* backslash-n).</li>
|
||||
* <li>{@code '...'} groups a sequence into one token; the surrounding
|
||||
* quotes are removed. Inside single quotes nothing is escaped —
|
||||
* the run is literal until the matching single quote.</li>
|
||||
* <li>Outside quotes, backslash escapes the next character (including
|
||||
* whitespace, so {@code needs\ verification} is one token).</li>
|
||||
* <li>An unterminated quote or a trailing backslash throws
|
||||
* {@link IllegalArgumentException} so the batch loop surfaces it
|
||||
* as a JSON error instead of silently emitting wrong args.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>Empty input (or input that contains only whitespace) returns an
|
||||
* empty array so callers can skip the line.
|
||||
*/
|
||||
static String[] tokenizeBatchLine(String line) {
|
||||
List<String> tokens = new ArrayList<>();
|
||||
StringBuilder current = new StringBuilder();
|
||||
boolean inToken = false;
|
||||
// 0 = outside, 1 = inside single quotes, 2 = inside double quotes
|
||||
int quoteMode = 0;
|
||||
int length = line.length();
|
||||
for (int i = 0; i < length; i++) {
|
||||
char c = line.charAt(i);
|
||||
if (quoteMode == 1) {
|
||||
if (c == '\'') {
|
||||
quoteMode = 0;
|
||||
} else {
|
||||
current.append(c);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (quoteMode == 2) {
|
||||
if (c == '\\') {
|
||||
if (i + 1 >= length) {
|
||||
throw new IllegalArgumentException(
|
||||
"batch tokenizer: trailing backslash inside double-quoted string");
|
||||
}
|
||||
char next = line.charAt(i + 1);
|
||||
if (next == '\\' || next == '"' || next == '\n') {
|
||||
current.append(next);
|
||||
i++;
|
||||
} else {
|
||||
// POSIX rule: inside double quotes a backslash is
|
||||
// literal unless it precedes \, ", $, `, or newline.
|
||||
current.append(c);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (c == '"') {
|
||||
quoteMode = 0;
|
||||
continue;
|
||||
}
|
||||
current.append(c);
|
||||
continue;
|
||||
}
|
||||
// Outside any quotes.
|
||||
if (c == '\'') {
|
||||
quoteMode = 1;
|
||||
inToken = true;
|
||||
continue;
|
||||
}
|
||||
if (c == '"') {
|
||||
quoteMode = 2;
|
||||
inToken = true;
|
||||
continue;
|
||||
}
|
||||
if (c == '\\') {
|
||||
if (i + 1 >= length) {
|
||||
throw new IllegalArgumentException(
|
||||
"batch tokenizer: trailing backslash outside quotes");
|
||||
}
|
||||
current.append(line.charAt(i + 1));
|
||||
i++;
|
||||
inToken = true;
|
||||
continue;
|
||||
}
|
||||
if (Character.isWhitespace(c)) {
|
||||
if (inToken) {
|
||||
tokens.add(current.toString());
|
||||
current.setLength(0);
|
||||
inToken = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
current.append(c);
|
||||
inToken = true;
|
||||
}
|
||||
if (quoteMode != 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"batch tokenizer: unterminated " + (quoteMode == 1 ? "single" : "double") + " quote");
|
||||
}
|
||||
if (inToken) {
|
||||
tokens.add(current.toString());
|
||||
}
|
||||
return tokens.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads one CLI invocation per stdin line, executes each via a fresh
|
||||
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
|
||||
@@ -183,8 +298,8 @@ public final class MxGatewayCli implements Callable<Integer> {
|
||||
if (line.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
String[] args = line.trim().split("\\s+");
|
||||
if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) {
|
||||
String[] args = tokenizeBatchLine(line);
|
||||
if (args.length == 0) {
|
||||
continue;
|
||||
}
|
||||
StringWriter cmdOut = new StringWriter();
|
||||
@@ -1079,11 +1194,29 @@ public final class MxGatewayCli implements Callable<Integer> {
|
||||
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
|
||||
.setAlarmFilterPrefix(filterPrefix)
|
||||
.build();
|
||||
// Client.Java-033 — fail-fast on overflow. A bare
|
||||
// queue.offer(value) silently drops messages past capacity,
|
||||
// which violates the JavaStyleGuide "do not drop events"
|
||||
// contract and lets the CLI exit 0 on a truncated feed.
|
||||
// Mirrors MxEventStream's overflow branch: detect a failed
|
||||
// offer, cancel the subscription, drain the buffer, then
|
||||
// queue an explicit overflow exception followed by the END
|
||||
// sentinel so the drain loop surfaces a non-zero exit.
|
||||
AtomicReference<MxGatewayAlarmFeedSubscription> subscriptionRef = new AtomicReference<>();
|
||||
MxGatewayAlarmFeedSubscription subscription =
|
||||
client.streamAlarms(request, new StreamObserver<>() {
|
||||
@Override
|
||||
public void onNext(AlarmFeedMessage value) {
|
||||
queue.offer(value);
|
||||
if (!queue.offer(value)) {
|
||||
MxGatewayAlarmFeedSubscription sub = subscriptionRef.get();
|
||||
if (sub != null) {
|
||||
sub.cancel();
|
||||
}
|
||||
queue.clear();
|
||||
queue.offer(new IllegalStateException(
|
||||
"stream-alarms queue overflowed (capacity 1024); consumer too slow"));
|
||||
queue.offer(ALARM_FEED_END);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1096,6 +1229,7 @@ public final class MxGatewayCli implements Callable<Integer> {
|
||||
queue.offer(ALARM_FEED_END);
|
||||
}
|
||||
});
|
||||
subscriptionRef.set(subscription);
|
||||
try {
|
||||
int count = 0;
|
||||
while (true) {
|
||||
|
||||
+216
@@ -225,6 +225,89 @@ final class MxGatewayCliTests {
|
||||
assertTrue(run.errors().contains("--reference"), run.errors());
|
||||
}
|
||||
|
||||
@Test
|
||||
void readmeDocumentedStreamAlarmsExampleParsesCleanly() {
|
||||
// Client.Java-032 regression — the README's stream-alarms example
|
||||
// (clients/java/README.md:182) must round-trip through picocli's
|
||||
// parser without a parse error. Before the fix, the example used
|
||||
// a non-existent --session-id option and picocli failed at parse
|
||||
// time. This test pins the exact tokens documented in the README.
|
||||
String[] args = {
|
||||
"stream-alarms",
|
||||
"--endpoint",
|
||||
"localhost:5000",
|
||||
"--api-key-env",
|
||||
"MXGATEWAY_API_KEY",
|
||||
"--plaintext",
|
||||
"--filter-prefix",
|
||||
"Galaxy",
|
||||
"--limit",
|
||||
"1",
|
||||
"--json"
|
||||
};
|
||||
assertReadmeExampleParses(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
void readmeDocumentedAcknowledgeAlarmExampleParsesCleanly() {
|
||||
// Client.Java-032 regression — the README's acknowledge-alarm
|
||||
// example (clients/java/README.md:183) must parse without error.
|
||||
// Before the fix it used --session-id (no such option) and
|
||||
// --alarm-reference (the real option is --reference), so picocli
|
||||
// rejected the invocation immediately.
|
||||
String[] args = {
|
||||
"acknowledge-alarm",
|
||||
"--endpoint",
|
||||
"localhost:5000",
|
||||
"--api-key-env",
|
||||
"MXGATEWAY_API_KEY",
|
||||
"--plaintext",
|
||||
"--reference",
|
||||
"\\Galaxy\\Area001.Pump001.PumpFault",
|
||||
"--json"
|
||||
};
|
||||
assertReadmeExampleParses(args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the given args through the production picocli {@link CommandLine}
|
||||
* and asserts no parser error, no unknown option, and no missing required
|
||||
* option. Does not execute the command body — only the option / subcommand
|
||||
* parser is exercised, so no network call is made.
|
||||
*/
|
||||
private static void assertReadmeExampleParses(String[] args) {
|
||||
picocli.CommandLine commandLine = MxGatewayCli.commandLine(new FakeClientFactory());
|
||||
try {
|
||||
commandLine.parseArgs(args);
|
||||
} catch (picocli.CommandLine.ParameterException ex) {
|
||||
throw new AssertionError(
|
||||
"documented README invocation failed picocli parse: "
|
||||
+ String.join(" ", args)
|
||||
+ " -> "
|
||||
+ ex.getMessage(),
|
||||
ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void streamAlarmsCommandFailsFastOnQueueOverflow() {
|
||||
// Client.Java-033 regression — the CLI's stream-alarms bounded queue
|
||||
// used queue.offer(value) which silently dropped messages past
|
||||
// capacity (1024). After the fix the CLI must surface the overflow
|
||||
// as a non-zero exit (mirroring MxEventStream's fail-fast contract).
|
||||
//
|
||||
// The OverflowingFakeClient floods the gRPC observer with 2000
|
||||
// messages synchronously, which exceeds the bounded 1024-element
|
||||
// queue. The fix detects the failed offer, cancels the subscription,
|
||||
// queues an overflow exception, and the drain loop surfaces it.
|
||||
OverflowingFakeClientFactory factory = new OverflowingFakeClientFactory();
|
||||
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Flood");
|
||||
|
||||
assertFalse(run.exitCode() == 0,
|
||||
"expected non-zero exit when the alarm queue overflows; got exit=" + run.exitCode()
|
||||
+ " out=\n" + run.output() + "\nerr=\n" + run.errors());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandExecutesVersionAndEmitsEorMarker() {
|
||||
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
|
||||
@@ -235,6 +318,68 @@ final class MxGatewayCliTests {
|
||||
assertTrue(out.contains(MxGatewayCli.BATCH_EOR), out);
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandTokenisesDoubleQuotedArgumentWithEmbeddedSpaces() {
|
||||
// Client.Java-034 regression — a real shell-style tokenizer must not
|
||||
// shred `"needs verification"` into two arguments. Drives
|
||||
// acknowledge-alarm through batch and asserts the captured --comment
|
||||
// is the un-quoted string with the embedded space preserved.
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"needs verification\" --operator op1\n";
|
||||
CliRun run = executeBatch(factory, line);
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||
assertEquals("op1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
|
||||
assertEquals(
|
||||
"Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandTokenisesSingleQuotedArgumentWithEmbeddedSpaces() {
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
String line =
|
||||
"acknowledge-alarm --reference Tank01.Level.HiHi --comment 'needs verification' --operator op1\n";
|
||||
CliRun run = executeBatch(factory, line);
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandTokenisesBackslashEscapedSpaceOutsideQuotes() {
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
String line =
|
||||
"acknowledge-alarm --reference Tank01.Level.HiHi --comment needs\\ verification\n";
|
||||
CliRun run = executeBatch(factory, line);
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandPreservesEmptyQuotedArgument() {
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"\"\n";
|
||||
CliRun run = executeBatch(factory, line);
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandSupportsBackslashEscapedQuoteInsideDoubleQuotes() {
|
||||
// `--comment "with \"inner\" quote"` should round-trip the inner
|
||||
// double-quote into the comment string.
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
String line =
|
||||
"acknowledge-alarm --reference Tank01.Level.HiHi --comment \"with \\\"inner\\\" quote\"\n";
|
||||
CliRun run = executeBatch(factory, line);
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("with \"inner\" quote", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||
}
|
||||
|
||||
@Test
|
||||
void batchCommandEmitsEorAfterFailedCommandAndContinues() {
|
||||
// An unknown subcommand causes a picocli parse error (non-zero exit).
|
||||
@@ -290,6 +435,77 @@ final class MxGatewayCliTests {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory whose fake client floods the {@code streamAlarms} observer with
|
||||
* 2000 messages synchronously, exceeding the CLI's bounded 1024-element
|
||||
* queue. Used by the Client.Java-033 fail-fast overflow regression.
|
||||
*/
|
||||
private static final class OverflowingFakeClientFactory implements MxGatewayCli.MxGatewayCliClientFactory {
|
||||
@Override
|
||||
public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) {
|
||||
return new OverflowingFakeClient(options.spec.commandLine().getOut());
|
||||
}
|
||||
}
|
||||
|
||||
private static final class OverflowingFakeClient implements MxGatewayCli.MxGatewayCliClient {
|
||||
private final PrintWriter out;
|
||||
|
||||
OverflowingFakeClient(PrintWriter out) {
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrintWriter out() {
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenSessionReply openSession(OpenSessionRequest request) {
|
||||
return OpenSessionReply.newBuilder().setSessionId("flood-session").setProtocolStatus(ok()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseSessionReply closeSession(CloseSessionRequest request) {
|
||||
return CloseSessionReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setFinalState(SessionState.SESSION_STATE_CLOSED)
|
||||
.setProtocolStatus(ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxGatewayCli.MxGatewayCliSession session(String sessionId) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxGatewayAlarmFeedSubscription streamAlarms(
|
||||
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
|
||||
// Synchronously push 2000 messages to overflow the CLI's bounded
|
||||
// 1024-element queue. The CLI must surface the overflow rather
|
||||
// than silently dropping the trailing ~976 messages.
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
observer.onNext(AlarmFeedMessage.newBuilder()
|
||||
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
|
||||
.setAlarmFullReference("Flood." + i)
|
||||
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
|
||||
.setSeverity(700))
|
||||
.build());
|
||||
}
|
||||
observer.onCompleted();
|
||||
return new MxGatewayAlarmFeedSubscription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FakeClient implements MxGatewayCli.MxGatewayCliClient {
|
||||
private final PrintWriter out;
|
||||
private final FakeSession session = new FakeSession();
|
||||
|
||||
+7
-52
@@ -2,64 +2,19 @@ package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
|
||||
import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Cancellable handle returned by the async {@code watchDeployEvents} variant.
|
||||
* Mirrors {@link MxGatewayEventSubscription} but for the Galaxy Repository
|
||||
* deploy-event stream.
|
||||
*
|
||||
* <p>All lifecycle / cancellation behaviour is inherited from
|
||||
* {@link MxGatewayStreamSubscription} (Client.Java-036).
|
||||
*/
|
||||
public final class DeployEventSubscription implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<WatchDeployEventsRequest>> requestStream =
|
||||
new AtomicReference<>();
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||
|
||||
ClientResponseObserver<WatchDeployEventsRequest, DeployEvent> wrap(StreamObserver<DeployEvent> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<WatchDeployEventsRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
if (cancelled.get()) {
|
||||
stream.cancel("client cancelled deploy event stream", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(DeployEvent value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||
* started; cancellation is recorded and applied as soon as the stream
|
||||
* attaches.
|
||||
*/
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
ClientCallStreamObserver<WatchDeployEventsRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled deploy event stream", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancel();
|
||||
public final class DeployEventSubscription
|
||||
extends MxGatewayStreamSubscription<WatchDeployEventsRequest, DeployEvent> {
|
||||
public DeployEventSubscription() {
|
||||
super("client cancelled deploy event stream");
|
||||
}
|
||||
}
|
||||
|
||||
+7
-51
@@ -1,10 +1,6 @@
|
||||
package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
||||
|
||||
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
||||
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
|
||||
* subscription also implements {@link AutoCloseable} so it can participate in
|
||||
* try-with-resources blocks.
|
||||
*
|
||||
* <p>All lifecycle / cancellation behaviour is inherited from
|
||||
* {@link MxGatewayStreamSubscription} (Client.Java-036).
|
||||
*/
|
||||
public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<QueryActiveAlarmsRequest>> requestStream = new AtomicReference<>();
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||
|
||||
ClientResponseObserver<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> wrap(StreamObserver<ActiveAlarmSnapshot> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<QueryActiveAlarmsRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
if (cancelled.get()) {
|
||||
stream.cancel("client cancelled active-alarms query", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ActiveAlarmSnapshot value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||
* started; cancellation is recorded and applied as soon as the stream
|
||||
* attaches.
|
||||
*/
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
ClientCallStreamObserver<QueryActiveAlarmsRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled active-alarms query", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancel();
|
||||
public final class MxGatewayActiveAlarmsSubscription
|
||||
extends MxGatewayStreamSubscription<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> {
|
||||
public MxGatewayActiveAlarmsSubscription() {
|
||||
super("client cancelled active-alarms query");
|
||||
}
|
||||
}
|
||||
|
||||
+7
-51
@@ -1,10 +1,6 @@
|
||||
package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||
|
||||
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
|
||||
* subscription also implements {@link AutoCloseable} so it can participate in
|
||||
* try-with-resources blocks.
|
||||
*
|
||||
* <p>All lifecycle / cancellation behaviour is inherited from
|
||||
* {@link MxGatewayStreamSubscription} (Client.Java-036).
|
||||
*/
|
||||
public final class MxGatewayAlarmFeedSubscription implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<StreamAlarmsRequest>> requestStream = new AtomicReference<>();
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||
|
||||
ClientResponseObserver<StreamAlarmsRequest, AlarmFeedMessage> wrap(StreamObserver<AlarmFeedMessage> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<StreamAlarmsRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
if (cancelled.get()) {
|
||||
stream.cancel("client cancelled alarm feed", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(AlarmFeedMessage value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||
* started; cancellation is recorded and applied as soon as the stream
|
||||
* attaches.
|
||||
*/
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
ClientCallStreamObserver<StreamAlarmsRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled alarm feed", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancel();
|
||||
public final class MxGatewayAlarmFeedSubscription
|
||||
extends MxGatewayStreamSubscription<StreamAlarmsRequest, AlarmFeedMessage> {
|
||||
public MxGatewayAlarmFeedSubscription() {
|
||||
super("client cancelled alarm feed");
|
||||
}
|
||||
}
|
||||
|
||||
+7
-51
@@ -1,10 +1,6 @@
|
||||
package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
|
||||
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
|
||||
* subscription also implements {@link AutoCloseable} so it can participate in
|
||||
* try-with-resources blocks.
|
||||
*
|
||||
* <p>All lifecycle / cancellation behaviour is inherited from
|
||||
* {@link MxGatewayStreamSubscription} (Client.Java-036).
|
||||
*/
|
||||
public final class MxGatewayEventSubscription implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<StreamEventsRequest>> requestStream = new AtomicReference<>();
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||
|
||||
ClientResponseObserver<StreamEventsRequest, MxEvent> wrap(StreamObserver<MxEvent> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
if (cancelled.get()) {
|
||||
stream.cancel("client cancelled event stream", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(MxEvent value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||
* started; cancellation is recorded and applied as soon as the stream
|
||||
* attaches.
|
||||
*/
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled event stream", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancel();
|
||||
public final class MxGatewayEventSubscription
|
||||
extends MxGatewayStreamSubscription<StreamEventsRequest, MxEvent> {
|
||||
public MxGatewayEventSubscription() {
|
||||
super("client cancelled event stream");
|
||||
}
|
||||
}
|
||||
|
||||
+89
@@ -0,0 +1,89 @@
|
||||
package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Shared base for the cancellable subscription handles returned by the
|
||||
* async-style server-streaming RPCs ({@code streamEvents}, {@code streamAlarms},
|
||||
* {@code queryActiveAlarms}, {@code watchDeployEvents}).
|
||||
*
|
||||
* <p>All four subscription classes share the same lifecycle and cancellation
|
||||
* contract:
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@link #wrap(StreamObserver)} returns a {@link ClientResponseObserver}
|
||||
* that captures the underlying {@link ClientCallStreamObserver} in
|
||||
* {@code beforeStart}. If {@link #cancel()} was called before the gRPC
|
||||
* call attached, the stream is cancelled eagerly inside
|
||||
* {@code beforeStart} (the Client.Java-014 close-before-beforeStart
|
||||
* fix).</li>
|
||||
* <li>{@link #cancel()} is idempotent. It records the cancellation flag and
|
||||
* forwards {@code cancel(message, cause)} to the underlying stream when
|
||||
* one is attached; otherwise the flag is checked in {@code beforeStart}
|
||||
* once the stream attaches.</li>
|
||||
* <li>{@link #close()} delegates to {@link #cancel()} so the handle can be
|
||||
* used with try-with-resources.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>Subclasses supply only the cancel-message string used by {@code cancel()}.
|
||||
* Refactor introduced for Client.Java-036 — the four prior subscription
|
||||
* classes were structural near-clones (~60 lines each).
|
||||
*/
|
||||
abstract class MxGatewayStreamSubscription<TRequest, TResponse> implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<TRequest>> requestStream = new AtomicReference<>();
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||
private final String cancelMessage;
|
||||
|
||||
MxGatewayStreamSubscription(String cancelMessage) {
|
||||
this.cancelMessage = cancelMessage;
|
||||
}
|
||||
|
||||
final ClientResponseObserver<TRequest, TResponse> wrap(StreamObserver<TResponse> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<TRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
if (cancelled.get()) {
|
||||
stream.cancel(cancelMessage, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(TResponse value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||
* started; cancellation is recorded and applied as soon as the stream
|
||||
* attaches.
|
||||
*/
|
||||
public final void cancel() {
|
||||
cancelled.set(true);
|
||||
ClientCallStreamObserver<TRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel(cancelMessage, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void close() {
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
+98
@@ -27,7 +27,10 @@ import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.BulkSubscribeReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
||||
@@ -41,6 +44,7 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -268,6 +272,100 @@ final class MxGatewayClientSessionTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages() throws Exception {
|
||||
AtomicReference<StreamAlarmsRequest> streamRequest = new AtomicReference<>();
|
||||
CountDownLatch serverCancelled = new CountDownLatch(1);
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
@Override
|
||||
public void streamAlarms(
|
||||
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> responseObserver) {
|
||||
streamRequest.set(request);
|
||||
ServerCallStreamObserver<AlarmFeedMessage> server =
|
||||
(ServerCallStreamObserver<AlarmFeedMessage>) responseObserver;
|
||||
server.setOnCancelHandler(serverCancelled::countDown);
|
||||
// Active-alarm snapshot, snapshot-complete sentinel, then a
|
||||
// transition — mirrors the shape of a real alarm feed open.
|
||||
server.onNext(AlarmFeedMessage.newBuilder()
|
||||
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
|
||||
.setAlarmFullReference("Tank01.Level.HiHi")
|
||||
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
|
||||
.setSeverity(700))
|
||||
.build());
|
||||
server.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
|
||||
server.onNext(AlarmFeedMessage.newBuilder()
|
||||
.setTransition(OnAlarmTransitionEvent.newBuilder()
|
||||
.setAlarmFullReference("Tank01.Level.HiHi")
|
||||
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
|
||||
.setSeverity(700))
|
||||
.build());
|
||||
// Note: we deliberately do NOT call onCompleted() so the call
|
||||
// remains open for the cancellation assertion below.
|
||||
}
|
||||
};
|
||||
|
||||
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
|
||||
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
|
||||
java.util.List<AlarmFeedMessage> received = new java.util.ArrayList<>();
|
||||
AtomicReference<Throwable> errorRef = new AtomicReference<>();
|
||||
CountDownLatch threeReceived = new CountDownLatch(3);
|
||||
|
||||
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
|
||||
.setAlarmFilterPrefix("Tank01")
|
||||
.build();
|
||||
|
||||
MxGatewayAlarmFeedSubscription subscription = client.streamAlarms(
|
||||
request,
|
||||
new StreamObserver<>() {
|
||||
@Override
|
||||
public void onNext(AlarmFeedMessage value) {
|
||||
received.add(value);
|
||||
threeReceived.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
errorRef.set(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(threeReceived.await(5, TimeUnit.SECONDS),
|
||||
"expected three alarm feed messages within 5s");
|
||||
|
||||
// The request shape (filter prefix in particular) must reach the
|
||||
// server — proves MxGatewayClient.streamAlarms calls the production
|
||||
// subscription.wrap(observer) glue and not a CLI override.
|
||||
assertNotNull(streamRequest.get());
|
||||
assertEquals("Tank01", streamRequest.get().getAlarmFilterPrefix());
|
||||
|
||||
// Order and payload-case must be preserved (the wrapping observer
|
||||
// is just a pass-through).
|
||||
assertEquals(3, received.size());
|
||||
assertEquals(AlarmFeedMessage.PayloadCase.ACTIVE_ALARM, received.get(0).getPayloadCase());
|
||||
assertEquals(
|
||||
"Tank01.Level.HiHi",
|
||||
received.get(0).getActiveAlarm().getAlarmFullReference());
|
||||
assertEquals(AlarmFeedMessage.PayloadCase.SNAPSHOT_COMPLETE, received.get(1).getPayloadCase());
|
||||
assertEquals(AlarmFeedMessage.PayloadCase.TRANSITION, received.get(2).getPayloadCase());
|
||||
assertEquals(
|
||||
AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE,
|
||||
received.get(2).getTransition().getTransitionKind());
|
||||
|
||||
// No error expected before cancellation — proves the wrapping
|
||||
// observer forwarded only data, not a synthetic error.
|
||||
assertNull(errorRef.get(), "no error expected before cancellation");
|
||||
|
||||
// Cancellation must propagate to the underlying gRPC call.
|
||||
subscription.cancel();
|
||||
assertTrue(serverCancelled.await(5, TimeUnit.SECONDS),
|
||||
"server should observe RPC cancellation after subscription.cancel()");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void commandFailureKeepsRawReply() throws Exception {
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
|
||||
+275
@@ -0,0 +1,275 @@
|
||||
package com.zb.mom.ww.mxgateway.client;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
|
||||
import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* Lifecycle / cancellation contract tests applied uniformly to each of the
|
||||
* four subscription classes that extend {@link MxGatewayStreamSubscription}.
|
||||
*
|
||||
* <p>Locks in the Client.Java-036 refactor: every subclass must exhibit the
|
||||
* same behaviour for (a) cancel-before-beforeStart eagerly cancelling the
|
||||
* stream once it attaches, (b) cancel-after-beforeStart forwarding directly
|
||||
* to the stream, (c) the cancel message matching the subclass's documented
|
||||
* value, (d) {@code close()} delegating to {@code cancel()}, and (e) the
|
||||
* wrapping observer forwarding {@code onNext}/{@code onError}/{@code onCompleted}
|
||||
* to the caller's observer.
|
||||
*/
|
||||
final class MxGatewayStreamSubscriptionContractTests {
|
||||
|
||||
@Test
|
||||
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_eventSubscription() {
|
||||
runCancelBeforeBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_alarmFeedSubscription() {
|
||||
runCancelBeforeBeforeStartTest(
|
||||
new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_activeAlarmsSubscription() {
|
||||
runCancelBeforeBeforeStartTest(
|
||||
new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_deployEventSubscription() {
|
||||
runCancelBeforeBeforeStartTest(
|
||||
new DeployEventSubscription(), "client cancelled deploy event stream");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelAfterBeforeStartForwardsToStream_eventSubscription() {
|
||||
runCancelAfterBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelAfterBeforeStartForwardsToStream_alarmFeedSubscription() {
|
||||
runCancelAfterBeforeStartTest(
|
||||
new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelAfterBeforeStartForwardsToStream_activeAlarmsSubscription() {
|
||||
runCancelAfterBeforeStartTest(
|
||||
new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query");
|
||||
}
|
||||
|
||||
@Test
|
||||
void cancelAfterBeforeStartForwardsToStream_deployEventSubscription() {
|
||||
runCancelAfterBeforeStartTest(
|
||||
new DeployEventSubscription(), "client cancelled deploy event stream");
|
||||
}
|
||||
|
||||
@Test
|
||||
void closeDelegatesToCancel_eventSubscription() {
|
||||
runCloseDelegatesToCancelTest(new MxGatewayEventSubscription());
|
||||
}
|
||||
|
||||
@Test
|
||||
void closeDelegatesToCancel_alarmFeedSubscription() {
|
||||
runCloseDelegatesToCancelTest(new MxGatewayAlarmFeedSubscription());
|
||||
}
|
||||
|
||||
@Test
|
||||
void closeDelegatesToCancel_activeAlarmsSubscription() {
|
||||
runCloseDelegatesToCancelTest(new MxGatewayActiveAlarmsSubscription());
|
||||
}
|
||||
|
||||
@Test
|
||||
void closeDelegatesToCancel_deployEventSubscription() {
|
||||
runCloseDelegatesToCancelTest(new DeployEventSubscription());
|
||||
}
|
||||
|
||||
@Test
|
||||
void wrappedObserverForwardsOnNextOnErrorOnCompleted_eventSubscription() {
|
||||
MxEvent event = MxEvent.newBuilder().setWorkerSequence(7L).build();
|
||||
runForwardingTest(new MxGatewayEventSubscription(), event);
|
||||
}
|
||||
|
||||
@Test
|
||||
void wrappedObserverForwardsOnNextOnErrorOnCompleted_alarmFeedSubscription() {
|
||||
AlarmFeedMessage msg = AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build();
|
||||
runForwardingTest(new MxGatewayAlarmFeedSubscription(), msg);
|
||||
}
|
||||
|
||||
@Test
|
||||
void wrappedObserverForwardsOnNextOnErrorOnCompleted_activeAlarmsSubscription() {
|
||||
ActiveAlarmSnapshot snap = ActiveAlarmSnapshot.newBuilder()
|
||||
.setAlarmFullReference("ref")
|
||||
.setSeverity(500)
|
||||
.build();
|
||||
runForwardingTest(new MxGatewayActiveAlarmsSubscription(), snap);
|
||||
}
|
||||
|
||||
@Test
|
||||
void wrappedObserverForwardsOnNextOnErrorOnCompleted_deployEventSubscription() {
|
||||
DeployEvent ev = DeployEvent.newBuilder().setSequence(1L).build();
|
||||
runForwardingTest(new DeployEventSubscription(), ev);
|
||||
}
|
||||
|
||||
private static <Req, Resp> void runCancelBeforeBeforeStartTest(
|
||||
MxGatewayStreamSubscription<Req, Resp> subscription, String expectedMessage) {
|
||||
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
|
||||
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
|
||||
|
||||
subscription.cancel();
|
||||
wrapped.beforeStart(stream);
|
||||
|
||||
assertTrue(stream.cancelled, "stream should have been cancelled by beforeStart after prior cancel()");
|
||||
assertEquals(expectedMessage, stream.cancelMessage);
|
||||
}
|
||||
|
||||
private static <Req, Resp> void runCancelAfterBeforeStartTest(
|
||||
MxGatewayStreamSubscription<Req, Resp> subscription, String expectedMessage) {
|
||||
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
|
||||
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
|
||||
|
||||
wrapped.beforeStart(stream);
|
||||
assertFalse(stream.cancelled, "stream should not be cancelled before cancel() is called");
|
||||
subscription.cancel();
|
||||
|
||||
assertTrue(stream.cancelled, "stream should have been cancelled by direct cancel()");
|
||||
assertEquals(expectedMessage, stream.cancelMessage);
|
||||
}
|
||||
|
||||
private static <Req, Resp> void runCloseDelegatesToCancelTest(
|
||||
MxGatewayStreamSubscription<Req, Resp> subscription) {
|
||||
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
|
||||
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
|
||||
|
||||
wrapped.beforeStart(stream);
|
||||
subscription.close();
|
||||
|
||||
assertTrue(stream.cancelled, "close() should delegate to cancel()");
|
||||
}
|
||||
|
||||
private static <Req, Resp> void runForwardingTest(
|
||||
MxGatewayStreamSubscription<Req, Resp> subscription, Resp value) {
|
||||
List<Resp> received = new ArrayList<>();
|
||||
AtomicReference<Throwable> errorRef = new AtomicReference<>();
|
||||
AtomicReference<Boolean> completed = new AtomicReference<>(false);
|
||||
|
||||
StreamObserver<Resp> caller = new StreamObserver<>() {
|
||||
@Override
|
||||
public void onNext(Resp v) {
|
||||
received.add(v);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
errorRef.set(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
completed.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(caller);
|
||||
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
|
||||
wrapped.beforeStart(stream);
|
||||
|
||||
wrapped.onNext(value);
|
||||
IllegalStateException boom = new IllegalStateException("boom");
|
||||
wrapped.onError(boom);
|
||||
wrapped.onCompleted();
|
||||
|
||||
assertEquals(1, received.size());
|
||||
assertEquals(value, received.get(0));
|
||||
assertNotNull(errorRef.get());
|
||||
assertEquals(boom, errorRef.get());
|
||||
assertTrue(completed.get());
|
||||
}
|
||||
|
||||
private static final class NoopObserver<T> implements StreamObserver<T> {
|
||||
@Override
|
||||
public void onNext(T value) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
}
|
||||
|
||||
private static final class RecordingClientCallStreamObserver<T> extends ClientCallStreamObserver<T> {
|
||||
boolean cancelled;
|
||||
String cancelMessage;
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOnReadyHandler(Runnable onReadyHandler) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableAutoInboundFlowControl() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void request(int count) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMessageCompression(boolean enable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(String message, Throwable cause) {
|
||||
cancelled = true;
|
||||
cancelMessage = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T value) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time guarantee that the parameter types still match the
|
||||
// generic bounds — catches a regression where a subclass changes its
|
||||
// request/response types out from under the shared base.
|
||||
@SuppressWarnings("unused")
|
||||
private static void typeBoundsCheck() {
|
||||
MxGatewayStreamSubscription<StreamEventsRequest, MxEvent> a = new MxGatewayEventSubscription();
|
||||
MxGatewayStreamSubscription<StreamAlarmsRequest, AlarmFeedMessage> b = new MxGatewayAlarmFeedSubscription();
|
||||
MxGatewayStreamSubscription<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> c =
|
||||
new MxGatewayActiveAlarmsSubscription();
|
||||
MxGatewayStreamSubscription<WatchDeployEventsRequest, DeployEvent> d = new DeployEventSubscription();
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-24 |
|
||||
| Commit reviewed | `42b0037` |
|
||||
| Status | Re-reviewed |
|
||||
| Open findings | 5 |
|
||||
| Open findings | 0 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -551,7 +551,7 @@ Client.Java-001..031 are unchanged.
|
||||
| Severity | High |
|
||||
| Category | Documentation & comments |
|
||||
| Location | `clients/java/README.md:182-183` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Commit `8738735` ("clients: document StreamAlarms + AcknowledgeAlarm in each README") added two new gradle invocations to the CLI Usage block:
|
||||
|
||||
@@ -569,6 +569,8 @@ A user copying either invocation from the README hits a picocli parse error imme
|
||||
|
||||
**Recommendation:** Drop the `--session-id <id>` token from both documented invocations, and change `--alarm-reference` to `--reference` in the `acknowledge-alarm` line. Optionally also add `--filter-prefix` to the `stream-alarms` example so readers see the scoping option, and align README option names with the actual CLI by either renaming the CLI option `--reference` → `--alarm-reference` (matches the proto `alarm_full_reference` field semantically) or leaving as is and only fixing the README. Add a small `MxGatewayCliTests` parse-only assertion for both subcommands that exercises every option flag to prevent the same drift the next time the CLI surface or README is touched.
|
||||
|
||||
**Resolution:** 2026-05-24 — Confirmed root cause against `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1174-1182,1248-1258`: `StreamAlarmsCommand` exposes only `--filter-prefix` / `--limit` and `AcknowledgeAlarmCommand` exposes `--reference` / `--comment` / `--operator` — neither has a `--session-id` option and `acknowledge-alarm` has no `--alarm-reference` option, so both documented invocations failed picocli parse at the first unknown option. Fixed `clients/java/README.md:182-183` by dropping the `--session-id <id>` token from both lines, replacing it with `--filter-prefix Galaxy` on the `stream-alarms` example so readers see the actual scoping flag, and changing `--alarm-reference` to `--reference` on the `acknowledge-alarm` example. Added `MxGatewayCli.commandLine(...)` to package-private visibility (was `private`) so the test can drive the production picocli `CommandLine` directly without executing the command body. Regression tests in `MxGatewayCliTests`: `readmeDocumentedStreamAlarmsExampleParsesCleanly` and `readmeDocumentedAcknowledgeAlarmExampleParsesCleanly` pin the exact token list documented in the README and assert `commandLine.parseArgs(...)` returns without throwing a `picocli.CommandLine.ParameterException`. TDD red phase: before the README fix the previously-documented tokens (`--session-id <id>` + `--alarm-reference ...`) would have thrown `Unknown option: '--session-id'` / `Unknown option: '--alarm-reference'` at parse time; the new tests pass against the corrected README and would fail the next time someone drifts the documented surface from the actual CLI options.
|
||||
|
||||
### Client.Java-033
|
||||
|
||||
| Field | Value |
|
||||
@@ -576,7 +578,7 @@ A user copying either invocation from the README hits a picocli parse error imme
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1078-1098` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `StreamAlarmsCommand.call()` allocates a bounded `ArrayBlockingQueue<Object>(1024)` and the gRPC observer publishes each `AlarmFeedMessage` via `queue.offer(value)`:
|
||||
|
||||
@@ -594,6 +596,8 @@ The library-side `MxEventStream` (Client.Java-002 resolution) and `DeployEventSt
|
||||
|
||||
**Recommendation:** Either (a) wrap the gRPC observer in the existing `MxEventStream`-style adaptor that calls `subscription.cancel()` and queues an exception on `queue.offer` returning `false`, then surface that exception from the drain loop — mirroring `MxEventStream.observer().onNext`'s overflow branch; or (b) reuse the library-side fail-fast plumbing by promoting `MxEventStream` (or extracting its terminal-state base) into a public `MxAlarmFeedStream` and have `MxGatewayClient.streamAlarms` return that instead of a bare subscription handle. Option (b) lines up with Client.Java-036 (deduplicate the subscription class family). Add a CLI regression test that overflows the bounded queue and asserts a non-zero exit / overflow exception, mirroring `MxGatewayMediumFindingsTests.eventStreamOverflowExceptionSurvivesASubsequentClose`.
|
||||
|
||||
**Resolution:** 2026-05-24 — Confirmed root cause at `MxGatewayCli.java` `StreamAlarmsCommand.call()`: the observer's `onNext` did `queue.offer(value)` and ignored the boolean return, so a 1024-element queue would silently drop messages past capacity. The same silent-drop affected the `onCompleted` branch (which `offer`s `ALARM_FEED_END`) once the queue was full, deadlocking the consumer since the drain loop never sees END. Took option (a) — minimal change that matches `MxEventStream`'s overflow branch. The fix: detect a failed `offer` inside `onNext`, call `subscription.cancel()` (via an `AtomicReference<MxGatewayAlarmFeedSubscription>` published immediately after `client.streamAlarms` returns), `queue.clear()`, then `queue.offer(IllegalStateException("stream-alarms queue overflowed (capacity 1024); consumer too slow"))` followed by `queue.offer(ALARM_FEED_END)`. The existing drain-loop `Throwable`-branch then surfaces the overflow as a thrown `IllegalStateException` from `call()`, which picocli reports as a non-zero CLI exit. Option (b) (promoting `MxEventStream` to a public alarm-feed stream) was considered and rejected for this change — it would change the public SDK surface; Client.Java-036's refactor handles deduplication at the subscription layer instead. Regression test: `MxGatewayCliTests.streamAlarmsCommandFailsFastOnQueueOverflow` — drives an `OverflowingFakeClient` whose `streamAlarms` synchronously pushes 2000 messages to the observer (exceeding the 1024 buffer), then asserts `run.exitCode() != 0`. TDD red phase confirmed deterministically: before the fix the test deadlocked (the buggy `offer` silently dropped both the overflowing alarms AND the `ALARM_FEED_END` sentinel that arrived after the queue filled, so the drain loop's `queue.take()` blocked forever); the background gradle run had to be killed with `TaskStop`. After the fix the same test exits in <1 second with the overflow exception propagating through picocli.
|
||||
|
||||
### Client.Java-034
|
||||
|
||||
| Field | Value |
|
||||
@@ -601,7 +605,7 @@ The library-side `MxEventStream` (Client.Java-002 resolution) and `DeployEventSt
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:182-198` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `BatchCommand.call()` reads one CLI invocation per stdin line and tokenises with:
|
||||
|
||||
@@ -622,6 +626,8 @@ The current `MxGatewayCliTests` test set (`batchCommandExecutesVersionAndEmitsEo
|
||||
|
||||
**Recommendation:** Replace `line.trim().split("\\s+")` with a real shell-style tokeniser that honours single and double quotes and backslash escapes — `picocli.CommandLine.ArgumentParser` doesn't ship one, but Apache Commons Exec's `CommandLine.translateCommandline(String)`, JDK 21's `java.util.spi.ToolProvider` argument parsing, or a small hand-written state machine all work. Cross-check the .NET / Go / Rust / Python `batch` implementations in the same change so all five clients use the same tokenisation; document the contract in the protocol comment in `MxGatewayCli.java` and in `scripts/run-client-e2e-tests.ps1`. Add a CLI test that feeds `acknowledge-alarm --comment "with spaces"` through `batch` and asserts the `--comment` value reaches the gateway as `"with spaces"`.
|
||||
|
||||
**Resolution:** 2026-05-24 — Confirmed root cause: `BatchCommand.call()` at the per-line loop used `line.trim().split("\\s+")` which has no quote handling. Replaced with a new package-private `MxGatewayCli.tokenizeBatchLine(String)` static helper — a hand-rolled POSIX-style shell tokenizer (no new dependency added) that honours: (a) double-quoted runs `"..."` with `\\`, `\"`, and `\n` escapes inside; (b) single-quoted runs `'...'` taken literally with no escapes (POSIX rule); (c) backslash escapes for any single character outside quotes (so `needs\ verification` is one token); (d) whitespace runs outside quotes separate tokens; (e) explicit `IllegalArgumentException` on unterminated quote or trailing backslash so the batch loop surfaces it as a JSON error instead of emitting wrong args. The `BatchCommand` per-line tokenisation now calls `tokenizeBatchLine(line)` and treats an empty-array result as a blank line (skip). Behaviour for whitespace-only input is unchanged. The cross-client `batch` audit (.NET / Go / Rust / Python) is out of scope for this Java-focused finding and tracked separately. Regression tests in `MxGatewayCliTests`: (a) `batchCommandTokenisesDoubleQuotedArgumentWithEmbeddedSpaces` — `--comment "needs verification"` round-trips intact; (b) `batchCommandTokenisesSingleQuotedArgumentWithEmbeddedSpaces` — single-quoted variant; (c) `batchCommandTokenisesBackslashEscapedSpaceOutsideQuotes` — `needs\ verification` outside quotes; (d) `batchCommandPreservesEmptyQuotedArgument` — `""` parses to an empty-string argument; (e) `batchCommandSupportsBackslashEscapedQuoteInsideDoubleQuotes` — `\"inner\"` survives the inner quotes. TDD red phase confirmed: all five tests failed against the original `split("\\s+")` implementation; after the fix all five pass.
|
||||
|
||||
### Client.Java-035
|
||||
|
||||
| Field | Value |
|
||||
@@ -629,7 +635,7 @@ The current `MxGatewayCliTests` test set (`batchCommandExecutesVersionAndEmitsEo
|
||||
| Severity | Low |
|
||||
| Category | Testing coverage |
|
||||
| Location | `clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Commit `8a0c59d` added `MxGatewayClient.streamAlarms(StreamAlarmsRequest, StreamObserver<AlarmFeedMessage>)` and a new public `MxGatewayAlarmFeedSubscription` class. No library-side test exercises either: a grep for `streamAlarms` across `zb-mom-ww-mxgateway-client/src/test/...` returns zero matches. The CLI tests (`MxGatewayCliTests.streamAlarmsCommand*`) exercise the path end-to-end, but they route through a `FakeClient.streamAlarms` override that bypasses the production `subscription.wrap(observer)` glue and the `withStreamDeadline(rawAsyncStub()).streamAlarms(...)` call. A regression to either — forgetting `.wrap(observer)`, dropping the deadline interceptor, misnaming the request — would compile and pass the CLI tests but break against a real gateway.
|
||||
|
||||
@@ -637,6 +643,8 @@ This is the same coverage gap pattern as Client.Java-030 (no fixture test for `Q
|
||||
|
||||
**Recommendation:** Add `streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages` to `MxGatewayClientSessionTests` (in-process gRPC via the existing `InProcessGateway` + `TestGatewayService` fixture): override `TestGatewayService.streamAlarms` to capture the inbound `StreamAlarmsRequest` and emit one `active_alarm` snapshot, one `snapshot_complete`, and one `transition`, then complete. Call `MxGatewayClient.streamAlarms`, drain the observer via a `CountDownLatch`, and assert (a) the server observed the `alarm_filter_prefix`, (b) all three messages arrived in order with the expected payload-case, and (c) `MxGatewayAlarmFeedSubscription.cancel()` aborts the call (latch via `ServerCallStreamObserver.setOnCancelHandler`, mirroring the Client.Java-015 cancellation regression). Optionally also cover the cancel-before-beforeStart race that `MxGatewayAlarmFeedSubscription.wrap` handles, mirroring `mxEventStreamCloseBeforeBeforeStartCancelsStream`.
|
||||
|
||||
**Resolution:** 2026-05-24 — Confirmed the coverage gap: a grep across `zb-mom-ww-mxgateway-client/src/test/...` for `streamAlarms` returned zero matches; the CLI-only test routed through `FakeClient.streamAlarms` which bypassed both the production `subscription.wrap(observer)` and the `withStreamDeadline(rawAsyncStub()).streamAlarms(...)` gRPC call. Added `streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages` to `MxGatewayClientSessionTests` in the same shape as `queryActiveAlarmsForwardsRequestAndStreamsSnapshots` (Client.Java-030 resolved this way). The test overrides `TestGatewayService.streamAlarms` to capture the inbound `StreamAlarmsRequest`, register a `serverCancelled` latch via `(ServerCallStreamObserver<AlarmFeedMessage>) responseObserver).setOnCancelHandler(...)`, then emit three messages: an `active_alarm` snapshot, a `snapshot_complete` sentinel, and a `transition`. It deliberately does NOT call `onCompleted()` so the call remains open for the cancellation assertion. The test then calls `MxGatewayClient.streamAlarms` against the in-process gateway, drains the wrapped observer via a `threeReceived` `CountDownLatch`, and asserts (a) the server observed `alarm_filter_prefix=Tank01`, (b) all three messages arrived in order with the expected payload-case (`ACTIVE_ALARM`, `SNAPSHOT_COMPLETE`, `TRANSITION`) and payload values (`Tank01.Level.HiHi`, transition kind `ACKNOWLEDGE`), and (c) `subscription.cancel()` causes the server's on-cancel handler to fire within 5 s (proves cancellation propagates through the production `subscription.wrap(observer)` glue, not just the CLI fake). TDD red phase: temporarily replaced the production `MxGatewayClient.streamAlarms` body with `withStreamDeadline(rawAsyncStub()).streamAlarms(request, observer);` (dropping the `subscription.wrap(observer)` indirection); the test failed at the `serverCancelled.await` assertion because cancellation was no longer wired to the underlying gRPC call. Restoring the production glue turned the build green.
|
||||
|
||||
### Client.Java-036
|
||||
|
||||
| Field | Value |
|
||||
@@ -644,7 +652,7 @@ This is the same coverage gap pattern as Client.Java-030 (no fixture test for `Q
|
||||
| Severity | Low |
|
||||
| Category | Code organization & conventions |
|
||||
| Location | `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java`, `MxGatewayEventSubscription.java`, `MxGatewayActiveAlarmsSubscription.java`, `DeployEventSubscription.java` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `MxGatewayAlarmFeedSubscription` is a structural near-copy of `MxGatewayEventSubscription` — same `AtomicReference<ClientCallStreamObserver<…>>` + `AtomicBoolean cancelled` field shape, the same `wrap(observer)` returning a `ClientResponseObserver` that stores `requestStream` in `beforeStart`, the same close-before-beforeStart race handling that Client.Java-014 originally fixed for `MxEventStream`, and the same `cancel()`+`close()` idempotency contract. The four subscription classes (`MxGatewayEventSubscription`, `MxGatewayActiveAlarmsSubscription`, `MxGatewayAlarmFeedSubscription`, `DeployEventSubscription`) are now ~60-line near-clones differing only in the request/response generic parameters and the `cancel` message string.
|
||||
|
||||
@@ -652,4 +660,6 @@ This is the same maintenance-hazard pattern Client.Java-009 / Client.Java-016 id
|
||||
|
||||
**Recommendation:** Extract a package-private abstract base, e.g. `MxGatewayStreamSubscription<TRequest>`, holding the `AtomicReference` / `AtomicBoolean` pair, the `cancel()` / `close()` implementation, and a `ClientResponseObserver` factory parameterised by the cancel-message string and the response observer. Have all four subscription classes extend it. Behaviour-only refactor — no public API change, existing tests cover the contract.
|
||||
|
||||
**Resolution:** 2026-05-24 — Extracted a package-private abstract base `MxGatewayStreamSubscription<TRequest, TResponse> implements AutoCloseable` (new file `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscription.java`). It holds the shared `AtomicReference<ClientCallStreamObserver<TRequest>>` and `AtomicBoolean cancelled` pair, the `wrap(StreamObserver<TResponse>)` factory that returns a `ClientResponseObserver` with the Client.Java-014 close-before-beforeStart fix baked in, the `cancel()` / `close()` implementation, and an immutable `cancelMessage` injected by the subclass constructor. The four prior 60-line near-clones (`MxGatewayEventSubscription`, `MxGatewayAlarmFeedSubscription`, `MxGatewayActiveAlarmsSubscription`, `DeployEventSubscription`) collapse to ~10-line subclasses that only declare their `<Request, Response>` type parameters and supply the cancel-message string to `super(...)`. Public API surface is preserved: each subclass remains a `public final class` with a public no-arg constructor (the constructor was implicit on the original classes; I made it explicit `public` on the subclasses so the existing CLI `FakeClient.streamAlarms` in a different package can still `new MxGatewayAlarmFeedSubscription()`). The `wrap(...)` method is `final` and package-private on the base — same accessibility the four subclasses had before — so production callers in `MxGatewayClient`/`GalaxyRepositoryClient` see no change. New test file `MxGatewayStreamSubscriptionContractTests` exercises the lifecycle/cancellation contract identically across all four subclasses (16 tests, four per scenario): (a) cancel-before-beforeStart eagerly cancels the stream once it attaches with the subclass-specific message, (b) cancel-after-beforeStart forwards directly to the stream, (c) `close()` delegates to `cancel()`, (d) the wrapped observer forwards `onNext`/`onError`/`onCompleted` verbatim, and a compile-time `typeBoundsCheck` helper that asserts each subclass still binds its `<Req, Resp>` parameters to the right proto types. TDD red phase confirmed: temporarily breaking one subclass's `super(...)` message to `"BROKEN MESSAGE"` made the contract test for that subclass fail with `expected: <client cancelled alarm feed> but was: <BROKEN MESSAGE>`; restoring the correct value turned all 16 contract tests green. Future fixes to the shared lifecycle now live in one place — the next Client.Java-014/021-style race fix cannot drift across the four classes.
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-24 |
|
||||
| Commit reviewed | `42b0037` |
|
||||
| Status | Re-reviewed |
|
||||
| Open findings | 1 |
|
||||
| Open findings | 0 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -494,7 +494,7 @@ The Write parity test (IntegrationTests-012's resolution) added exactly this ass
|
||||
| Severity | Low |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.IntegrationTests/IntegrationTestEnvironmentTests.cs:57-84` (`ResolveRepositoryRoot_NoMarkers_ThrowsInvalidOperationExceptionNamingStartAndMarkers`) |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The new regression test for IntegrationTests-022 builds an "isolated" start directory under `Path.GetTempPath()` (e.g. `C:\Users\<user>\AppData\Local\Temp\<random>\nested` on Windows) and calls `ResolveRepositoryRoot(isolatedStart)`, asserting an `InvalidOperationException` is thrown. The walker walks every parent — `<random>`, `Temp`, `Local`, `AppData`, `<user>`, `Users`, `C:\` — and stops only when it either finds a repository root marker or runs out of parents. The test silently assumes none of those ancestor directories satisfies `IsRepositoryRoot` (a `src/` subdirectory next to `.git` / `*.sln` / `*.slnx`). The assumption is environment-dependent:
|
||||
|
||||
@@ -504,3 +504,5 @@ The Write parity test (IntegrationTests-012's resolution) added exactly this ass
|
||||
The current dev box layout (`C:\Users\dohertj2\Desktop\mxaccessgw`) is safe because Temp is at `C:\Users\dohertj2\AppData\Local\Temp` and the walker exits at `C:\` without ever encountering `src/`. The fragility is invisible on this machine and only surfaces if the test ever runs in CI / on a contributor box with a less hermetic file-system layout.
|
||||
|
||||
**Recommendation:** Isolate the walker from any ambient ancestor by either (a) constructing an `isolatedRoot` directly under a drive root and pointing the walker at a chain entirely under it (e.g. create `<isolatedRoot>\level1\level2\level3` and start the walk at `level3`, then assert the throw — the walker stops at the drive root regardless of what is on it), (b) refactoring `ResolveRepositoryRoot` to accept an injectable `stopBoundary` parameter for tests and pass `isolatedRoot` as the boundary, or (c) replacing the `Assert.Throws` shape with an explicit upward-walk check that the test owns. Option (a) is the smallest change: prepend a sentinel — e.g. create a dummy `<isolatedRoot>\sentinel-no-markers` and assert nothing about Temp ancestors — and pass the test only when the walker reaches that sentinel without finding a marker. The current shape is acceptable on the documented dev box but should not be the sole regression coverage for IntegrationTests-022.
|
||||
|
||||
**Resolution:** Resolved 2026-05-24 — Took option (b) (inject a stop-boundary) because option (a) does not actually solve the leak: a sentinel chain under `Path.GetTempPath()` still leaves the walker free to ascend past it into Temp / AppData / Users / C:\, so any ambient ancestor with `src/` + `.git`/`.sln`/`.slnx` still wins. Added an optional `stopBoundary` parameter to `IntegrationTestEnvironment.ResolveRepositoryRoot(string startDirectory, string? stopBoundary = null)`. When supplied, the walker checks the boundary for markers and then stops, refusing to ascend past it; production callers (the `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` resolution path) continue to pass `null` so the walk to drive-root behavior is unchanged. Updated both existing tests (`ResolveRepositoryRoot_AcceptsGitWorktreeFile` and `ResolveRepositoryRoot_NoMarkers_ThrowsInvalidOperationExceptionNamingStartAndMarkers`) to pass their owned temp directory as the boundary, sealing the walker inside a chain the test fully controls. Added a new regression test `ResolveRepositoryRoot_StopBoundary_IsolatesWalkerFromAmbientAncestorMarkers` that deliberately constructs an outer marker-bearing ancestor (`outerRoot/src` + `outerRoot/.git`), an inner boundary, and an isolated start beneath the boundary; first asserts that without the boundary the walker leaks up to `outerRoot` (the precise IntegrationTests-025 failure mode), then asserts that *with* the boundary the same call throws — proving the boundary is the load-bearing isolation. TDD red/green confirmed: the new regression test fails against the pre-fix walker (`Assert.Throws() Failure: No exception was thrown`) and passes once the boundary handling is restored. Re-ran the full `IntegrationTestEnvironmentTests` slice with `TMP` / `TEMP` redirected under a deliberately constructed `<temp>\fake-repo-ancestor` directory carrying `src/` and a `.git` file — the original flake repro from the finding — and confirmed all 5 tests pass (the same redirection produced `Assert.Throws() Failure` on the pre-fix code). Build: 0 warnings / 0 errors.
|
||||
|
||||
+15
-16
@@ -12,13 +12,13 @@ Each module's `findings.md` is the source of truth; this file is generated from
|
||||
|---|---|---|---|---|---|---|
|
||||
| [Client.Dotnet](Client.Dotnet/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 21 |
|
||||
| [Client.Go](Client.Go/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 27 |
|
||||
| [Client.Java](Client.Java/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 5 | 36 |
|
||||
| [Client.Java](Client.Java/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 36 |
|
||||
| [Client.Python](Client.Python/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 26 |
|
||||
| [Client.Rust](Client.Rust/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 29 |
|
||||
| [Contracts](Contracts/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 17 |
|
||||
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 1 | 25 |
|
||||
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 25 |
|
||||
| [Server](Server/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 50 |
|
||||
| [Tests](Tests/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 5 | 31 |
|
||||
| [Tests](Tests/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 31 |
|
||||
| [Worker](Worker/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 25 |
|
||||
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-24 | `42b0037` | Re-reviewed | 0 | 30 |
|
||||
|
||||
@@ -26,19 +26,7 @@ Each module's `findings.md` is the source of truth; this file is generated from
|
||||
|
||||
Findings with status `Open` or `In Progress`, ordered by severity.
|
||||
|
||||
| ID | Severity | Category | Location | Description |
|
||||
|---|---|---|---|---|
|
||||
| Client.Java-032 | High | Documentation & comments | `clients/java/README.md:182-183` | Commit `8738735` ("clients: document StreamAlarms + AcknowledgeAlarm in each README") added two new gradle invocations to the CLI Usage block: ``` gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-ke… |
|
||||
| Client.Java-033 | Medium | Correctness & logic bugs | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1078-1098` | `StreamAlarmsCommand.call()` allocates a bounded `ArrayBlockingQueue<Object>(1024)` and the gRPC observer publishes each `AlarmFeedMessage` via `queue.offer(value)`: ``` BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024); … @Over… |
|
||||
| Client.Java-034 | Medium | Correctness & logic bugs | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:182-198` | `BatchCommand.call()` reads one CLI invocation per stdin line and tokenises with: ``` String[] args = line.trim().split("\\s+"); … int exitCode = cmd.execute(args); ``` `split("\\s+")` does no shell-quoting parsing — it just splits on whit… |
|
||||
| Tests-027 | Medium | Concurrency & thread safety | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs:199-240`, `src/ZB.MOM.WW.MxGateway.Server/Metrics/GatewayMetrics.cs:8,73,246-251` | The review brief explicitly flagged `MxAccessGatewayServiceTests.StreamEvents_WhenEventIsWritten_RecordsSendDuration` as a known flake that "passed solo on rerun". The root cause is the `MeterListener` subscribes by `instrument.Meter.Name… |
|
||||
| Client.Java-035 | Low | Testing coverage | `clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java` | Commit `8a0c59d` added `MxGatewayClient.streamAlarms(StreamAlarmsRequest, StreamObserver<AlarmFeedMessage>)` and a new public `MxGatewayAlarmFeedSubscription` class. No library-side test exercises either: a grep for `streamAlarms` across `… |
|
||||
| Client.Java-036 | Low | Code organization & conventions | `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java`, `MxGatewayEventSubscription.java`, `MxGatewayActiveAlarmsSubscription.java`, `DeployEventSubscription.java` | `MxGatewayAlarmFeedSubscription` is a structural near-copy of `MxGatewayEventSubscription` — same `AtomicReference<ClientCallStreamObserver<…>>` + `AtomicBoolean cancelled` field shape, the same `wrap(observer)` returning a `ClientResponse… |
|
||||
| IntegrationTests-025 | Low | Correctness & logic bugs | `src/ZB.MOM.WW.MxGateway.IntegrationTests/IntegrationTestEnvironmentTests.cs:57-84` (`ResolveRepositoryRoot_NoMarkers_ThrowsInvalidOperationExceptionNamingStartAndMarkers`) | The new regression test for IntegrationTests-022 builds an "isolated" start directory under `Path.GetTempPath()` (e.g. `C:\Users\<user>\AppData\Local\Temp\<random>\nested` on Windows) and calls `ResolveRepositoryRoot(isolatedStart)`, asser… |
|
||||
| Tests-028 | Low | Testing coverage | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs:466-496,802-807`, `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs:216-253` | The new `KillWorkerAsync_KillsWorkerAndRemovesSession` (line 466) and `KillWorkerAsync_WhenSessionMissing_ThrowsSessionNotFound` (line 486) pin the new kill-path entry, but they do not pin the `reason` argument propagating through the chai… |
|
||||
| Tests-029 | Low | Error handling & resilience | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSessionAdminServiceTests.cs:61-106,139-222`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardSessionAdminService.cs:77-125` | The new `DashboardSessionAdminServiceTests` covers the happy path and the viewer-denial path for both `CloseSessionAsync` and `KillWorkerAsync`, plus `CloseSessionAsync_WhenSessionMissing_ReportsFriendlyError` for the close-side `SessionNo… |
|
||||
| Tests-030 | Low | Testing coverage | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardApiKeyManagementServiceTests.cs:115-163`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardApiKeyManagementService.cs:146-177` | The three new `DeleteAsync_*` fixtures cover unauthorised user, success path with audit, and store-refuses-with-friendly-error. They do not exercise two production behaviours: (1) `DeleteAsync_WhenStoreRefuses_ReportsFriendlyError` (line 1… |
|
||||
| Tests-031 | Low | Concurrency & thread safety | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs:22-61` | `ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay` records `startedAt = DateTimeOffset.UtcNow` *before* calling `publisher.StartAsync(...)`, then asserts `secondSubscribeAt - startedAt >= reconnectDelay - 10ms` (line 59). Th… |
|
||||
_No pending findings._
|
||||
|
||||
## Closed findings
|
||||
|
||||
@@ -49,6 +37,7 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Server-001 | Critical | Resolved | Security | `src/MxGateway.Server/GatewayApplication.cs:147-149`, `src/MxGateway.Server/Dashboard/DashboardEndpointRouteBuilderExtensions.cs:55-58`, `src/MxGateway.Server/Dashboard/Components/Routes.razor:1-15` |
|
||||
| Client.Go-001 | High | Resolved | Correctness & logic bugs | `clients/go/mxgateway/errors.go:88-93`, `clients/go/mxgateway/errors.go:117-128` |
|
||||
| Client.Java-013 | High | Resolved | Testing coverage | `clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java:212-304`, `clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java:1214-1244` |
|
||||
| Client.Java-032 | High | Resolved | Documentation & comments | `clients/java/README.md:182-183` |
|
||||
| Client.Python-018 | High | Resolved | Code organization & conventions | `clients/python/pyproject.toml:11` |
|
||||
| Client.Python-022 | High | Resolved | Documentation & comments | `clients/python/README.md:201-202`, `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:389-420` |
|
||||
| Client.Rust-001 | High | Resolved | mxaccessgw conventions | `clients/rust/src/options.rs:98,143` |
|
||||
@@ -86,6 +75,8 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Client.Java-021 | Medium | Resolved | Concurrency & thread safety | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/DeployEventStream.java:96-135` |
|
||||
| Client.Java-027 | Medium | Resolved | Documentation & comments | `clients/java/README.md:36,107-175,185,205,220`, `clients/java/JavaClientDesign.md:195-211` |
|
||||
| Client.Java-028 | Medium | Resolved | Documentation & comments | `clients/java/JavaClientDesign.md:23-27` |
|
||||
| Client.Java-033 | Medium | Resolved | Correctness & logic bugs | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1078-1098` |
|
||||
| Client.Java-034 | Medium | Resolved | Correctness & logic bugs | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:182-198` |
|
||||
| Client.Python-003 | Medium | Resolved | Error handling & resilience | `clients/python/src/mxgateway/client.py:125-137,155-173` |
|
||||
| Client.Python-005 | Medium | Resolved | Performance & resource management | `clients/python/src/mxgateway/galaxy.py:117-140` |
|
||||
| Client.Python-009 | Medium | Resolved | Testing coverage | `clients/python/tests/` |
|
||||
@@ -130,6 +121,7 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Tests-016 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Galaxy/GalaxyHierarchyCacheTests.cs:29-41,115-124` |
|
||||
| Tests-020 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceConstraintTests.cs:275-347`, `src/MxGateway.Server/Grpc/MxAccessGatewayService.cs:803-829` |
|
||||
| Tests-026 | Medium | Resolved | Testing coverage | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs`, `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:123-126` |
|
||||
| Tests-027 | Medium | Resolved | Concurrency & thread safety | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs:199-240`, `src/ZB.MOM.WW.MxGateway.Server/Metrics/GatewayMetrics.cs:8,73,246-251` |
|
||||
| Worker-004 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:565-588` |
|
||||
| Worker-005 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:205-258` (production alarm poll loop) |
|
||||
| Worker-006 | Medium | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:117-124`, `src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs:386-491` |
|
||||
@@ -205,6 +197,8 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Client.Java-029 | Low | Resolved | Documentation & comments | `clients/java/README.md:208-209` |
|
||||
| Client.Java-030 | Low | Resolved | Testing coverage | `clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/` |
|
||||
| Client.Java-031 | Low | Resolved | mxaccessgw conventions | `clients/java/README.md:13,17,26` |
|
||||
| Client.Java-035 | Low | Resolved | Testing coverage | `clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java` |
|
||||
| Client.Java-036 | Low | Resolved | Code organization & conventions | `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java`, `MxGatewayEventSubscription.java`, `MxGatewayActiveAlarmsSubscription.java`, `DeployEventSubscription.java` |
|
||||
| Client.Python-001 | Low | Resolved | Documentation & comments | `clients/python/pyproject.toml:8,25`, `clients/python/src/mxgateway_cli/commands.py:25` |
|
||||
| Client.Python-002 | Low | Resolved | Code organization & conventions | `clients/python/src/mxgateway/__init__.py:27` |
|
||||
| Client.Python-004 | Low | Resolved | Correctness & logic bugs | `clients/python/src/mxgateway_cli/commands.py:386,402-404` |
|
||||
@@ -268,6 +262,7 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| IntegrationTests-022 | Low | Resolved | Code organization & conventions | `src/ZB.MOM.WW.MxGateway.IntegrationTests/IntegrationTestEnvironment.cs:103-138` (`ResolveRepositoryRoot` / `IsRepositoryRoot`) |
|
||||
| IntegrationTests-023 | Low | Resolved | Testing coverage | `src/ZB.MOM.WW.MxGateway.IntegrationTests/DashboardLdapLiveTests.cs:14-29` |
|
||||
| IntegrationTests-024 | Low | Resolved | Code organization & conventions | `src/ZB.MOM.WW.MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` (`NullDashboardEventBroadcaster` private class at end of file) |
|
||||
| IntegrationTests-025 | Low | Resolved | Correctness & logic bugs | `src/ZB.MOM.WW.MxGateway.IntegrationTests/IntegrationTestEnvironmentTests.cs:57-84` (`ResolveRepositoryRoot_NoMarkers_ThrowsInvalidOperationExceptionNamingStartAndMarkers`) |
|
||||
| Server-007 | Low | Resolved | Performance & resource management | `src/MxGateway.Server/Galaxy/GalaxyHierarchyProjector.cs:55-70` |
|
||||
| Server-008 | Low | Resolved | Performance & resource management | `src/MxGateway.Server/Grpc/GalaxyRepositoryGrpcService.cs:111-134,160-189` |
|
||||
| Server-009 | Low | Resolved | Error handling & resilience | `src/MxGateway.Server/Security/Authentication/AuthSqliteConnectionFactory.cs:15-32` |
|
||||
@@ -318,6 +313,10 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
| Tests-023 | Low | Resolved | Correctness & logic bugs | `src/MxGateway.Tests/Gateway/Sessions/SessionWorkerClientFactoryFakeWorkerTests.cs:334-374` |
|
||||
| Tests-024 | Low | Resolved | Testing coverage | `src/MxGateway.Server/Grpc/MxAccessGatewayService.cs:713-730,784-801,859-876`, `src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceConstraintTests.cs` |
|
||||
| Tests-025 | Low | Resolved | Code organization & conventions | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:285-289`, `src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs:417-421` |
|
||||
| Tests-028 | Low | Resolved | Testing coverage | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs:466-496,802-807`, `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs:216-253` |
|
||||
| Tests-029 | Low | Resolved | Error handling & resilience | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSessionAdminServiceTests.cs:61-106,139-222`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardSessionAdminService.cs:77-125` |
|
||||
| Tests-030 | Low | Resolved | Testing coverage | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardApiKeyManagementServiceTests.cs:115-163`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardApiKeyManagementService.cs:146-177` |
|
||||
| Tests-031 | Low | Resolved | Concurrency & thread safety | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs:22-61` |
|
||||
| Worker-009 | Low | Resolved | Performance & resource management | `src/MxGateway.Worker/Ipc/WorkerFrameReader.cs:31,49`, `src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs:57-58` |
|
||||
| Worker-010 | Low | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Conversion/VariantConverter.cs:204-226` |
|
||||
| Worker-011 | Low | Resolved | Correctness & logic bugs | `src/MxGateway.Worker/Ipc/WorkerPipeClient.cs:169-171` |
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-24 |
|
||||
| Commit reviewed | `42b0037` |
|
||||
| Status | Re-reviewed |
|
||||
| Open findings | 5 |
|
||||
| Open findings | 0 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -488,12 +488,14 @@ The cancellation tests for `WorkerClient` in `WorkerClientTests` *do* exercise t
|
||||
| Severity | Medium |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs:199-240`, `src/ZB.MOM.WW.MxGateway.Server/Metrics/GatewayMetrics.cs:8,73,246-251` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The review brief explicitly flagged `MxAccessGatewayServiceTests.StreamEvents_WhenEventIsWritten_RecordsSendDuration` as a known flake that "passed solo on rerun". The root cause is the `MeterListener` subscribes by `instrument.Meter.Name == GatewayMetrics.MeterName` (a *process-shared* constant `"MxGateway.Server"`), not by the specific `GatewayMetrics` instance constructed in the test. Tests-012 made the xUnit parallelism policy explicit (`parallelizeTestCollections: true`, `maxParallelThreads: -1`), and every other test that builds its own `GatewayMetrics()` and exercises `MxAccessGatewayService.StreamEvents` or `EventStreamService.StreamEventsAsync` (e.g. the new `StreamEventsAsync_*` family added by Tests-026 and Server-041, plus the pre-existing `StreamEventsAsync_YieldsEventsInWorkerOrder` etc.) routes through `GatewayMetrics.RecordEventStreamSend` → the same histogram name `mxgateway.events.stream_send.duration`. When two such tests run concurrently in the same xUnit process, the `MeterListener` in this test sees measurements from *both* meters and `families.Count` grows to >1, breaking `Assert.Equal([MxEventFamily.OnDataChange.ToString()], families)`. Solo reruns pass because no other producer is alive. This is exactly the cross-test mutable-state pattern Tests-012 set the guardrail comment against.
|
||||
|
||||
**Recommendation:** Either (a) filter the `MeterListener` callback by the specific `Meter` instance — capture `metrics._meter` (or expose `GatewayMetrics.Meter`) and compare with `ReferenceEquals(instrument.Meter, expectedMeter)` instead of comparing `Meter.Name`; or (b) place this test in a single-threaded `[Collection("GatewayMetrics-Listener")]` so no other `RecordEventStreamSend` producer runs concurrently. Option (a) is preferred because it removes the cross-talk vector permanently and lets the test stay parallelisable.
|
||||
|
||||
**Resolution:** 2026-05-24 — Applied option (a). Added an `internal Meter Meter => _meter;` accessor on `GatewayMetrics` (visible to the Tests project via the existing `InternalsVisibleTo`) and changed both the `InstrumentPublished` filter and the `SetMeasurementEventCallback<double>` filter in `StreamEvents_WhenEventIsWritten_RecordsSendDuration` from `instrument.Meter.Name == GatewayMetrics.MeterName` to `ReferenceEquals(instrument.Meter, metrics.Meter)`. Added a companion regression `StreamEvents_RecordSendDurationListener_IgnoresMeasurementsFromOtherMetersWithSameName` that constructs a second `GatewayMetrics`, records an `OnWriteComplete` measurement on it before the test-under-test publishes, and asserts the listener captures only the test-under-test's `OnDataChange` family. Confirmed the regression catches the original `Meter.Name`-only filter (got `["OnWriteComplete", "OnDataChange"]` for `["OnDataChange"]`) by temporarily reverting the filter shape; restored ReferenceEquals after. Suite green 3/3 (512/512); the two Tests-027 tests pass 5/5 solo. The cross-talk vector is permanently closed without giving up parallelism.
|
||||
|
||||
### Tests-028
|
||||
|
||||
| Field | Value |
|
||||
@@ -501,12 +503,14 @@ The cancellation tests for `WorkerClient` in `WorkerClientTests` *do* exercise t
|
||||
| Severity | Low |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs:466-496,802-807`, `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs:216-253` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The new `KillWorkerAsync_KillsWorkerAndRemovesSession` (line 466) and `KillWorkerAsync_WhenSessionMissing_ThrowsSessionNotFound` (line 486) pin the new kill-path entry, but they do not pin the `reason` argument propagating through the chain. `SessionManager.KillWorkerAsync(sessionId, reason, ct)` validates `reason` with `ArgumentException.ThrowIfNullOrWhiteSpace(reason)` (line 221), calls `session.KillWorker(reason)` (line 229), and logs `reason={Reason}` (line 251); but the `FakeWorkerClient.Kill(string reason)` discards the argument (line 803-807) and the assertion is only `Assert.Equal(1, workerClient.KillCount)`. A regression that (a) hard-coded an internal `"unspecified"` reason between `SessionManager` and `GatewaySession`, (b) swapped to a different overload that dropped the reason, or (c) deleted the `ThrowIfNullOrWhiteSpace` guard would all pass the current tests. The dashboard caller (`DashboardSessionAdminService.KillWorkerAsync`) passes a hard-coded `"dashboard-admin-kill"` reason and the only test that observes it (`KillWorkerAsync_AdminKillsWorker`) asserts `!string.IsNullOrWhiteSpace(LastKillReason)` rather than pinning the value — so the same-class drift is also untested.
|
||||
|
||||
**Recommendation:** (1) Capture `LastKillReason` on `FakeWorkerClient.Kill` and assert `KillWorkerAsync_KillsWorkerAndRemovesSession` propagates the test-supplied `"test-kill"` string end-to-end. (2) Add `KillWorkerAsync_WithBlankReason_ThrowsArgumentException` (parameterised over `null`, `""`, `" "`) to pin the `ArgumentException.ThrowIfNullOrWhiteSpace` guard. (3) Tighten `DashboardSessionAdminServiceTests.KillWorkerAsync_AdminKillsWorker` to `Assert.Equal("dashboard-admin-kill", sessionManager.LastKillReason)` so a future reason-string change is a deliberate test update.
|
||||
|
||||
**Resolution:** 2026-05-24 — Added `LastKillReason` to `FakeWorkerClient` in `SessionManagerTests.cs` and set it inside `Kill(string reason)`. Tightened `KillWorkerAsync_KillsWorkerAndRemovesSession` to assert `workerClient.LastKillReason == "test-kill"`, pinning the end-to-end propagation from `SessionManager.KillWorkerAsync` → `session.KillWorker(reason)` → `IWorkerClient.Kill(reason)`. Added `KillWorkerAsync_WithBlankReason_ThrowsArgumentException` as a `[Theory]` over `""`, `" "`, `"\t"` plus a separate `KillWorkerAsync_WithNullReason_ThrowsArgumentNullException` fact (xUnit `InlineData` cannot carry `null` for a non-nullable string, and `ArgumentException.ThrowIfNullOrWhiteSpace` throws `ArgumentNullException` for `null`). Both new tests confirm `KillCount == 0` and the session remains registered, proving the guard fires before any lookup or worker call. Tightened `DashboardSessionAdminServiceTests.KillWorkerAsync_AdminKillsWorker` to `Assert.Equal("dashboard-admin-kill", sessionManager.LastKillReason)`. All affected tests pass; suite green.
|
||||
|
||||
### Tests-029
|
||||
|
||||
| Field | Value |
|
||||
@@ -514,12 +518,16 @@ The cancellation tests for `WorkerClient` in `WorkerClientTests` *do* exercise t
|
||||
| Severity | Low |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSessionAdminServiceTests.cs:61-106,139-222`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardSessionAdminService.cs:77-125` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The new `DashboardSessionAdminServiceTests` covers the happy path and the viewer-denial path for both `CloseSessionAsync` and `KillWorkerAsync`, plus `CloseSessionAsync_WhenSessionMissing_ReportsFriendlyError` for the close-side `SessionNotFound` catch — but the kill-side error branches are not tested. The product code's `KillWorkerAsync` (lines 111-114) has the same `SessionNotFound` catch returning `"Session {id} was not found."` and (lines 115-124) a generic `SessionManagerException` catch returning `"Kill failed: {message}"`; neither is exercised. The fake's `KillWorkerAsync` (lines 200-209) only succeeds — there is no `KillThrowsNotFound` / `KillThrowsGeneric` configuration option matching the existing `CloseThrowsNotFound`. Symmetrically, `CloseSessionAsync` has the same `IsNullOrWhiteSpace(sessionId)` guard (line 37-40) but no blank-id test even though `KillWorkerAsync_BlankSessionId_ReturnsFailure` exists for the parallel kill guard — a guard-removal regression on close would slip through.
|
||||
|
||||
**Recommendation:** Mirror the existing close-side fixtures onto the kill side: add `KillThrowsNotFound` / `KillThrowsGeneric` init-flags to the `FakeSessionManager`, then `KillWorkerAsync_WhenSessionMissing_ReportsFriendlyError`, `KillWorkerAsync_WhenSessionManagerThrows_ReportsKillFailedMessage`, and `CloseSessionAsync_BlankSessionId_ReturnsFailure`. These are mechanical copies of the existing patterns and bring close/kill coverage into symmetry.
|
||||
|
||||
**Re-triage note:** The Server batch already added `CloseSessionAsync_WhenManagerThrowsUnexpected_ReturnsFriendlyFail` and `KillWorkerAsync_WhenManagerThrowsUnexpected_ReturnsFriendlyFail` (the Server-050 regressions visible at HEAD lines 125-162 of the test file), so the kill-side `SessionManagerException` general-catch branch and the close-side parallel are both covered there in a generic-exception shape. The only remaining asymmetry was the blank-session-id guard, per the prompt scope.
|
||||
|
||||
**Resolution:** 2026-05-24 — Added `CloseSessionAsync_BlankSessionId_ReturnsFailure` to `DashboardSessionAdminServiceTests`. The new test invokes `service.CloseSessionAsync(adminUser, " ", ct)` and asserts `Succeeded == false` and `sessionManager.CloseCount == 0`, pinning the `string.IsNullOrWhiteSpace(sessionId)` guard at `DashboardSessionAdminService.cs:52-55`. This brings close/kill blank-id coverage into symmetry with the existing `KillWorkerAsync_BlankSessionId_ReturnsFailure`. The `KillThrowsNotFound` / `KillThrowsGeneric` extensions from the original recommendation are not needed because the unexpected-throw branches are already covered by the Server-050 regressions noted above. All tests pass; suite green.
|
||||
|
||||
### Tests-030
|
||||
|
||||
| Field | Value |
|
||||
@@ -527,12 +535,14 @@ The cancellation tests for `WorkerClient` in `WorkerClientTests` *do* exercise t
|
||||
| Severity | Low |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardApiKeyManagementServiceTests.cs:115-163`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardApiKeyManagementService.cs:146-177` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The three new `DeleteAsync_*` fixtures cover unauthorised user, success path with audit, and store-refuses-with-friendly-error. They do not exercise two production behaviours: (1) `DeleteAsync_WhenStoreRefuses_ReportsFriendlyError` (line 151-163) does not construct or inject a `FakeApiKeyAuditStore`, so it never observes that the product code still emits an audit entry with `EventType = "dashboard-delete-key"` and `Details = "not-found-or-active"` on the failure branch (`AppendAuditAsync` runs unconditionally at line 167-172). A regression that placed the `AppendAuditAsync` call inside the `if (deleted)` branch would silently drop the audit trail for refused deletes — a real audit-completeness gap. (2) There is no `DeleteAsync_BlankKeyId_ReturnsFailure` or `DeleteAsync_InvalidKeyId_ReturnsFailure` test, even though `ValidateKeyId(keyId)` (line 156-160) guards on the same conditions as Create/Revoke/Rotate. The `Revoke`/`Rotate` paths have equivalent fixtures (the file's earlier tests cover them); only Delete is missing them.
|
||||
|
||||
**Recommendation:** (1) Add a `FakeApiKeyAuditStore` to `DeleteAsync_WhenStoreRefuses_ReportsFriendlyError` and assert it contains exactly one entry with `EventType == "dashboard-delete-key"` and `Details == "not-found-or-active"`. (2) Add `DeleteAsync_BlankKeyId_ReturnsFailure` (parameterised over `null`, `""`, `" "`) and `DeleteAsync_InvalidKeyId_ReturnsFailure` (a keyId with characters the `ValidateKeyId` rules reject) to pin the validation branch end-to-end.
|
||||
|
||||
**Resolution:** 2026-05-24 — Renamed `DeleteAsync_WhenStoreRefuses_ReportsFriendlyError` to `DeleteAsync_WhenStoreRefuses_ReportsFriendlyErrorAndAudits` and extended it to inject a `FakeApiKeyAuditStore`; the test now asserts the single audit entry has `EventType == "dashboard-delete-key"`, `KeyId == "operator01"`, and `Details == "not-found-or-active"`. This pins the unconditional-audit invariant at `DashboardApiKeyManagementService.cs:167-172` — a regression moving the `AppendAuditAsync` call inside `if (deleted)` would now fail the test. Added `DeleteAsync_BlankKeyId_ReturnsFailure` as a `[Theory]` over `""`, `" "`, `"\t"` that asserts `Succeeded == false`, `adminStore.DeleteCount == 0`, AND `auditStore.Entries` is empty — pinning that the `ValidateKeyId` guard at line 156-160 fires before any store or audit work. All tests pass; suite green.
|
||||
|
||||
### Tests-031
|
||||
|
||||
| Field | Value |
|
||||
@@ -540,8 +550,10 @@ The cancellation tests for `WorkerClient` in `WorkerClientTests` *do* exercise t
|
||||
| Severity | Low |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs:22-61` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay` records `startedAt = DateTimeOffset.UtcNow` *before* calling `publisher.StartAsync(...)`, then asserts `secondSubscribeAt - startedAt >= reconnectDelay - 10ms` (line 59). The measured gap is *not* the reconnect delay in isolation — it is `(StartAsync scheduling) + (first WatchSnapshotsAsync setup + Task.Yield) + (throw) + reconnect delay + (second WatchSnapshotsAsync setup)`. On a slow/contended CI agent the first three terms easily dominate (favouring the assertion); but on a fast machine, Windows `Task.Delay(50ms)` rounds up to the next ~15.6 ms tick boundary and may return at ~46-50 ms relative to schedule, while the first three terms can be sub-millisecond — so the gap measurement can land within 1-2 ms of the lower bound, and the 10 ms slack may not absorb a single missed quantum. This is a latent flake of the same flavour as Tests-006 (heartbeat timing) but on a wall-clock dependency the test cannot inject around because `DashboardSnapshotPublisher` uses `Task.Delay(_reconnectDelay)` directly. Tests-006 / Tests-017 moved heartbeat tests onto `ManualTimeProvider`; this test cannot do that without a product change to use a `TimeProvider`-aware delay.
|
||||
|
||||
**Recommendation:** (a) The cheap fix: have `ThrowOnceThenYieldSnapshotService` record `_firstThrowAt = DateTimeOffset.UtcNow` immediately before the `throw`, and change the assertion to `secondSubscribeAt - firstThrowAt >= reconnectDelay - 10ms` — the gap then measures only the reconnect delay, eliminating the variable scheduling baseline. (b) The deeper fix: extend `DashboardSnapshotPublisher` to accept an `ITimeProvider`-style delay seam (or a virtual `DelayAsync` hook) so a `ManualTimeProvider` could advance time deterministically. (a) is preferred for now; (b) belongs as a follow-up if more reconnect-loop tests are added.
|
||||
|
||||
**Resolution:** 2026-05-24 — Applied option (a). Added `FirstThrowAt` to `ThrowOnceThenYieldSnapshotService` and set it via `FirstThrowAt = DateTimeOffset.UtcNow;` immediately before the first-call `throw`. Removed the pre-`StartAsync` `startedAt` baseline; the assertion now reads `gap = secondSubscribeAt - firstThrowAt` (both timestamps captured inside the fake), and the 10 ms slack absorbs the Windows `Task.Delay` quantum without the variable `StartAsync` / scheduling overhead in the baseline. This is the same flake-isolation pattern Tests-006 / Tests-017 used (measuring only the production delay, not test-side setup). Suite green; the test passes deterministically across repeated runs.
|
||||
|
||||
@@ -109,12 +109,26 @@ public static class IntegrationTestEnvironment
|
||||
/// remains the escape hatch for unusual deployments.
|
||||
/// </summary>
|
||||
/// <param name="startDirectory">Starting directory to search from.</param>
|
||||
/// <param name="stopBoundary">
|
||||
/// Optional upper bound for the parent walk. When supplied, the walker checks
|
||||
/// <paramref name="stopBoundary"/> for markers and then stops, ignoring all
|
||||
/// ancestors above it. Tests pass an isolated boundary so the walker cannot
|
||||
/// leak into ambient ancestors (a redirected <c>TMP</c>, a co-located checkout
|
||||
/// at <c>C:\src</c>, an enclosing CI workspace, etc.) that would silently
|
||||
/// satisfy <see cref="IsRepositoryRoot"/> — see IntegrationTests-025.
|
||||
/// Production callers pass <see langword="null"/> so the walk continues to the
|
||||
/// drive root as before.
|
||||
/// </param>
|
||||
/// <returns>The repository root path.</returns>
|
||||
/// <exception cref="InvalidOperationException">
|
||||
/// Thrown when the parent walk exhausts without finding a repository root.
|
||||
/// </exception>
|
||||
internal static string ResolveRepositoryRoot(string startDirectory)
|
||||
internal static string ResolveRepositoryRoot(string startDirectory, string? stopBoundary = null)
|
||||
{
|
||||
string? normalizedBoundary = stopBoundary is null
|
||||
? null
|
||||
: Path.TrimEndingDirectorySeparator(Path.GetFullPath(stopBoundary));
|
||||
|
||||
DirectoryInfo? directory = new(startDirectory);
|
||||
while (directory is not null)
|
||||
{
|
||||
@@ -123,6 +137,15 @@ public static class IntegrationTestEnvironment
|
||||
return directory.FullName;
|
||||
}
|
||||
|
||||
if (normalizedBoundary is not null
|
||||
&& string.Equals(
|
||||
Path.TrimEndingDirectorySeparator(directory.FullName),
|
||||
normalizedBoundary,
|
||||
StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
directory = directory.Parent;
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,11 @@ public sealed class IntegrationTestEnvironmentTests
|
||||
Directory.CreateDirectory(Path.Combine(temporaryRoot, "src"));
|
||||
File.WriteAllText(Path.Combine(temporaryRoot, ".git"), "gitdir: ../.git/worktrees/test");
|
||||
|
||||
string repositoryRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(nestedDirectory);
|
||||
// Pass temporaryRoot as the stop-boundary so the walker can never leak
|
||||
// into ambient ancestors of Path.GetTempPath() (IntegrationTests-025).
|
||||
string repositoryRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(
|
||||
nestedDirectory,
|
||||
stopBoundary: temporaryRoot);
|
||||
|
||||
Assert.Equal(temporaryRoot, repositoryRoot);
|
||||
}
|
||||
@@ -52,6 +56,11 @@ public sealed class IntegrationTestEnvironmentTests
|
||||
/// the walk exhausts without finding a repository root. The previous silent
|
||||
/// fallback to <c>Directory.GetCurrentDirectory()</c> masked misconfiguration
|
||||
/// (IntegrationTests-022); operators get a clear, actionable failure instead.
|
||||
/// The <c>stopBoundary</c> isolates the walker from ambient ancestors of
|
||||
/// <see cref="Path.GetTempPath"/> (a redirected <c>TMP</c>, a co-located checkout
|
||||
/// at <c>C:\src</c>, etc.) that could otherwise satisfy
|
||||
/// <c>IsRepositoryRoot</c> and make this assertion flake on contributor or CI
|
||||
/// boxes — see IntegrationTests-025.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void ResolveRepositoryRoot_NoMarkers_ThrowsInvalidOperationExceptionNamingStartAndMarkers()
|
||||
@@ -64,7 +73,9 @@ public sealed class IntegrationTestEnvironmentTests
|
||||
Directory.CreateDirectory(isolatedStart);
|
||||
|
||||
InvalidOperationException ex = Assert.Throws<InvalidOperationException>(
|
||||
() => IntegrationTestEnvironment.ResolveRepositoryRoot(isolatedStart));
|
||||
() => IntegrationTestEnvironment.ResolveRepositoryRoot(
|
||||
isolatedStart,
|
||||
stopBoundary: isolatedRoot));
|
||||
|
||||
Assert.Contains(isolatedStart, ex.Message, StringComparison.Ordinal);
|
||||
Assert.Contains(".git", ex.Message, StringComparison.Ordinal);
|
||||
@@ -82,4 +93,58 @@ public sealed class IntegrationTestEnvironmentTests
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the <c>stopBoundary</c> parameter on
|
||||
/// <see cref="IntegrationTestEnvironment.ResolveRepositoryRoot"/> isolates the
|
||||
/// walker from ambient ancestors that happen to satisfy <c>IsRepositoryRoot</c>
|
||||
/// — the precise failure mode IntegrationTests-025 describes. The test
|
||||
/// deliberately constructs an outer directory that *does* carry repository-root
|
||||
/// markers (<c>src/</c> + <c>.git</c>) and an inner isolated chain that does
|
||||
/// not. Without the boundary the walker would happily stop at the outer
|
||||
/// directory; with the boundary it must throw because the chain it can see
|
||||
/// carries no markers. A future refactor that dropped or mis-honored the
|
||||
/// boundary would surface here as a failed assertion instead of a silent flake
|
||||
/// in CI.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void ResolveRepositoryRoot_StopBoundary_IsolatesWalkerFromAmbientAncestorMarkers()
|
||||
{
|
||||
string outerRoot = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
|
||||
string innerBoundary = Path.Combine(outerRoot, "inner");
|
||||
string isolatedStart = Path.Combine(innerBoundary, "nested");
|
||||
|
||||
try
|
||||
{
|
||||
// Outer directory satisfies IsRepositoryRoot — it is the "ambient
|
||||
// ancestor" the production walker would otherwise stop at.
|
||||
Directory.CreateDirectory(Path.Combine(outerRoot, "src"));
|
||||
File.WriteAllText(Path.Combine(outerRoot, ".git"), "gitdir: ../.git/worktrees/test");
|
||||
|
||||
// Inner chain carries no markers. The boundary sits between the inner
|
||||
// chain and the outer marker-bearing ancestor.
|
||||
Directory.CreateDirectory(isolatedStart);
|
||||
|
||||
// Sanity: without the boundary the production walker reaches outerRoot
|
||||
// and silently returns it — the exact ambient-ancestor leak.
|
||||
string leakedRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(isolatedStart);
|
||||
Assert.Equal(outerRoot, leakedRoot);
|
||||
|
||||
// With the boundary the walker is sealed inside the inner chain and
|
||||
// must throw — the marker on outerRoot is invisible to it.
|
||||
InvalidOperationException ex = Assert.Throws<InvalidOperationException>(
|
||||
() => IntegrationTestEnvironment.ResolveRepositoryRoot(
|
||||
isolatedStart,
|
||||
stopBoundary: innerBoundary));
|
||||
|
||||
Assert.Contains(isolatedStart, ex.Message, StringComparison.Ordinal);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (Directory.Exists(outerRoot))
|
||||
{
|
||||
Directory.Delete(outerRoot, recursive: true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,15 @@ public sealed class GatewayMetrics : IDisposable
|
||||
_meter.CreateObservableGauge("mxgateway.events.grpc_stream_queue.depth", GetGrpcEventStreamQueueDepth);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the underlying <see cref="Meter"/> instance backing this metrics object. Exposed to tests
|
||||
/// (via <c>InternalsVisibleTo</c>) so a <see cref="MeterListener"/> can filter measurements by
|
||||
/// <see cref="object.ReferenceEquals"/> against this specific instance rather than by the
|
||||
/// process-shared <see cref="MeterName"/>, which would cross-talk between parallel tests that
|
||||
/// each build their own <see cref="GatewayMetrics"/> (Tests-027).
|
||||
/// </summary>
|
||||
internal Meter Meter => _meter;
|
||||
|
||||
/// <summary>
|
||||
/// Records that a session has been opened.
|
||||
/// </summary>
|
||||
|
||||
+41
-2
@@ -147,11 +147,20 @@ public sealed class DashboardApiKeyManagementServiceTests
|
||||
&& entry.Details == "deleted");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-030: when the admin store refuses the delete (returns <c>false</c>), the service
|
||||
/// still emits a <c>dashboard-delete-key</c> audit entry with <c>Details = "not-found-or-active"</c>
|
||||
/// because <c>AppendAuditAsync</c> runs unconditionally after the store call. A regression that
|
||||
/// moved the audit-append call inside the <c>if (deleted)</c> branch would silently drop the
|
||||
/// audit trail for refused deletes — a real audit-completeness gap. This test pins both the
|
||||
/// friendly-error response AND the unconditional audit entry.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DeleteAsync_WhenStoreRefuses_ReportsFriendlyError()
|
||||
public async Task DeleteAsync_WhenStoreRefuses_ReportsFriendlyErrorAndAudits()
|
||||
{
|
||||
FakeApiKeyAdminStore adminStore = new() { DeleteResult = false };
|
||||
DashboardApiKeyManagementService service = CreateService(adminStore);
|
||||
FakeApiKeyAuditStore auditStore = new();
|
||||
DashboardApiKeyManagementService service = CreateService(adminStore, auditStore);
|
||||
|
||||
DashboardApiKeyManagementResult result = await service.DeleteAsync(
|
||||
CreateAuthorizedUser(),
|
||||
@@ -160,6 +169,36 @@ public sealed class DashboardApiKeyManagementServiceTests
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Contains("Revoke", result.Message, StringComparison.Ordinal);
|
||||
|
||||
ApiKeyAuditEntry auditEntry = Assert.Single(auditStore.Entries);
|
||||
Assert.Equal("dashboard-delete-key", auditEntry.EventType);
|
||||
Assert.Equal("operator01", auditEntry.KeyId);
|
||||
Assert.Equal("not-found-or-active", auditEntry.Details);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-030: <see cref="DashboardApiKeyManagementService.DeleteAsync"/> calls
|
||||
/// <c>ValidateKeyId</c> after the authorisation check. A blank key id must fail with the
|
||||
/// shared "API key id is required." message before any store or audit call runs.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData("")]
|
||||
[InlineData(" ")]
|
||||
[InlineData("\t")]
|
||||
public async Task DeleteAsync_BlankKeyId_ReturnsFailure(string blankKeyId)
|
||||
{
|
||||
FakeApiKeyAdminStore adminStore = new();
|
||||
FakeApiKeyAuditStore auditStore = new();
|
||||
DashboardApiKeyManagementService service = CreateService(adminStore, auditStore);
|
||||
|
||||
DashboardApiKeyManagementResult result = await service.DeleteAsync(
|
||||
CreateAuthorizedUser(),
|
||||
blankKeyId,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Equal(0, adminStore.DeleteCount);
|
||||
Assert.Empty(auditStore.Entries);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
+26
-1
@@ -87,7 +87,12 @@ public sealed class DashboardSessionAdminServiceTests
|
||||
Assert.True(result.Succeeded);
|
||||
Assert.Equal(1, sessionManager.KillCount);
|
||||
Assert.Equal("session-1", sessionManager.LastKilledSessionId);
|
||||
Assert.False(string.IsNullOrWhiteSpace(sessionManager.LastKillReason));
|
||||
|
||||
// Tests-028: pin the literal reason string so a future caller-side change is a deliberate
|
||||
// test update rather than a silent drift. DashboardSessionAdminService passes a hard-coded
|
||||
// "dashboard-admin-kill" so the worker-exit metric (mxgateway.workers.killed) carries a
|
||||
// stable, machine-greppable reason tag.
|
||||
Assert.Equal("dashboard-admin-kill", sessionManager.LastKillReason);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
@@ -105,6 +110,26 @@ public sealed class DashboardSessionAdminServiceTests
|
||||
Assert.Equal(0, sessionManager.KillCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-029: <c>CloseSessionAsync</c> has the same blank-session-id guard as
|
||||
/// <c>KillWorkerAsync</c> but previously had no parallel test. Coverage was asymmetric.
|
||||
/// A guard-removal regression on the close path would slip through.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task CloseSessionAsync_BlankSessionId_ReturnsFailure()
|
||||
{
|
||||
FakeSessionManager sessionManager = new();
|
||||
DashboardSessionAdminService service = CreateService(sessionManager);
|
||||
|
||||
DashboardSessionAdminResult result = await service.CloseSessionAsync(
|
||||
CreateUser(DashboardRoles.Admin),
|
||||
" ",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Equal(0, sessionManager.CloseCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CanManage_RejectsUnauthenticatedAndViewer()
|
||||
{
|
||||
|
||||
@@ -17,6 +17,13 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
/// reconnect delay and then re-open the subscription. Before the fix,
|
||||
/// the publisher exited on the first non-cancellation exception and
|
||||
/// the dashboard's snapshot stream went silent until process restart.
|
||||
///
|
||||
/// <para>Tests-031: the reconnect-gap measurement is bounded between the
|
||||
/// moment the first subscribe actually <c>throw</c>s and the moment the
|
||||
/// second subscribe begins. Measuring from <c>startedAt</c> (pre-<c>StartAsync</c>)
|
||||
/// baselined scheduling overhead into the budget and made the lower bound
|
||||
/// flaky on slow CI; recording <c>firstThrowAt</c> inside the fake removes
|
||||
/// that baseline so only the <c>Task.Delay(reconnectDelay)</c> contributes.</para>
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay()
|
||||
@@ -31,7 +38,6 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
reconnectDelay);
|
||||
|
||||
using CancellationTokenSource cts = new();
|
||||
DateTimeOffset startedAt = DateTimeOffset.UtcNow;
|
||||
Task execute = publisher.StartAsync(cts.Token);
|
||||
await execute.WaitAsync(TestTimeout);
|
||||
|
||||
@@ -42,6 +48,8 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
await WaitUntilAsync(() => snapshotService.SubscribeCount >= 2);
|
||||
await WaitUntilAsync(() => hubContext.SendCount >= 1);
|
||||
|
||||
DateTimeOffset firstThrowAt = snapshotService.FirstThrowAt
|
||||
?? throw new InvalidOperationException("First subscribe did not record a throw timestamp.");
|
||||
DateTimeOffset secondSubscribeAt = snapshotService.SecondSubscribeAt
|
||||
?? throw new InvalidOperationException("Second subscribe did not record a timestamp.");
|
||||
|
||||
@@ -52,10 +60,13 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
$"Expected at least 2 subscribe calls, got {snapshotService.SubscribeCount}.");
|
||||
Assert.True(hubContext.SendCount >= 1);
|
||||
|
||||
// The gap between the throw (first subscribe) and the reconnect
|
||||
// (second subscribe) is bounded below by the reconnect delay. We
|
||||
// give a small slack (10ms) for scheduling jitter on slow CI VMs.
|
||||
TimeSpan gap = secondSubscribeAt - startedAt;
|
||||
// Tests-031: the gap is measured from the moment the first subscribe
|
||||
// actually threw (inside the fake) to the moment the second subscribe
|
||||
// began (also inside the fake). This isolates the publisher's
|
||||
// Task.Delay(reconnectDelay) — no StartAsync / scheduling overhead in
|
||||
// the baseline. The 10ms slack absorbs Task.Delay's coarse Windows
|
||||
// timer quantum (~15ms) when the underlying scheduler wakes early.
|
||||
TimeSpan gap = secondSubscribeAt - firstThrowAt;
|
||||
Assert.True(gap >= reconnectDelay - TimeSpan.FromMilliseconds(10),
|
||||
$"Expected reconnect gap >= {reconnectDelay.TotalMilliseconds}ms; got {gap.TotalMilliseconds}ms.");
|
||||
}
|
||||
@@ -100,6 +111,15 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
private sealed class ThrowOnceThenYieldSnapshotService : IDashboardSnapshotService
|
||||
{
|
||||
public int SubscribeCount { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Tests-031: the wall-clock instant the first <c>WatchSnapshotsAsync</c> throws.
|
||||
/// The reconnect-gap assertion is measured against this timestamp (NOT the
|
||||
/// pre-<c>StartAsync</c> wall clock) so scheduling overhead is not baselined
|
||||
/// into the lower bound.
|
||||
/// </summary>
|
||||
public DateTimeOffset? FirstThrowAt { get; private set; }
|
||||
|
||||
public DateTimeOffset? SecondSubscribeAt { get; private set; }
|
||||
|
||||
public DashboardSnapshot GetSnapshot()
|
||||
@@ -118,6 +138,7 @@ public sealed class DashboardSnapshotPublisherTests
|
||||
// First call: throw after a brief yield so the publisher
|
||||
// observes us as a live producer that failed.
|
||||
await Task.Yield();
|
||||
FirstThrowAt = DateTimeOffset.UtcNow;
|
||||
throw new InvalidOperationException("simulated transient snapshot failure");
|
||||
}
|
||||
|
||||
|
||||
@@ -194,7 +194,17 @@ public sealed class MxAccessGatewayServiceTests
|
||||
Assert.Equal("session-1", sessionManager.LastReadEventsSessionId);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that StreamEvents records send duration metrics when an event is written.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that <c>StreamEvents</c> records the send-duration histogram per event.
|
||||
///
|
||||
/// <para>Tests-027 (concurrency flake): the listener must filter by the specific
|
||||
/// <see cref="System.Diagnostics.Metrics.Meter"/> instance owned by this test, not by the process-shared
|
||||
/// <see cref="GatewayMetrics.MeterName"/>. Otherwise a parallel test that constructs its own
|
||||
/// <see cref="GatewayMetrics"/> and records <c>mxgateway.events.stream_send.duration</c> would
|
||||
/// cross-contaminate <c>families</c> and break the equality assertion below. See the companion
|
||||
/// <see cref="StreamEvents_RecordSendDurationListener_IgnoresMeasurementsFromOtherMetersWithSameName"/>
|
||||
/// regression for the cross-talk reproduction.</para>
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEvents_WhenEventIsWritten_RecordsSendDuration()
|
||||
{
|
||||
@@ -203,7 +213,7 @@ public sealed class MxAccessGatewayServiceTests
|
||||
List<string> families = [];
|
||||
listener.InstrumentPublished = (instrument, meterListener) =>
|
||||
{
|
||||
if (instrument.Meter.Name == GatewayMetrics.MeterName
|
||||
if (ReferenceEquals(instrument.Meter, metrics.Meter)
|
||||
&& instrument.Name == "mxgateway.events.stream_send.duration")
|
||||
{
|
||||
meterListener.EnableMeasurementEvents(instrument);
|
||||
@@ -212,7 +222,8 @@ public sealed class MxAccessGatewayServiceTests
|
||||
listener.SetMeasurementEventCallback<double>(
|
||||
(instrument, measurement, tags, _) =>
|
||||
{
|
||||
if (instrument.Name != "mxgateway.events.stream_send.duration")
|
||||
if (!ReferenceEquals(instrument.Meter, metrics.Meter)
|
||||
|| instrument.Name != "mxgateway.events.stream_send.duration")
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -239,6 +250,69 @@ public sealed class MxAccessGatewayServiceTests
|
||||
Assert.Equal([MxEventFamily.OnDataChange.ToString()], families);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-027 regression: a <see cref="MeterListener"/> that filters by the specific
|
||||
/// <see cref="System.Diagnostics.Metrics.Meter"/> instance (via <see cref="object.ReferenceEquals"/>)
|
||||
/// must NOT observe measurements recorded on a different <see cref="GatewayMetrics"/> that shares
|
||||
/// the same <see cref="GatewayMetrics.MeterName"/>. This is the cross-talk vector that previously
|
||||
/// caused <c>StreamEvents_WhenEventIsWritten_RecordsSendDuration</c> to fail intermittently when
|
||||
/// run in parallel with another test recording the same histogram.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEvents_RecordSendDurationListener_IgnoresMeasurementsFromOtherMetersWithSameName()
|
||||
{
|
||||
using GatewayMetrics metricsUnderTest = new();
|
||||
using GatewayMetrics otherMetrics = new();
|
||||
using MeterListener listener = new();
|
||||
List<string> families = [];
|
||||
listener.InstrumentPublished = (instrument, meterListener) =>
|
||||
{
|
||||
// Subscribe to the stream_send histogram on BOTH meters so the listener
|
||||
// would observe a cross-talk measurement if the callback did not filter.
|
||||
if (instrument.Name == "mxgateway.events.stream_send.duration")
|
||||
{
|
||||
meterListener.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<double>(
|
||||
(instrument, measurement, tags, _) =>
|
||||
{
|
||||
if (!ReferenceEquals(instrument.Meter, metricsUnderTest.Meter)
|
||||
|| instrument.Name != "mxgateway.events.stream_send.duration")
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (KeyValuePair<string, object?> tag in tags)
|
||||
{
|
||||
if (tag.Key == "family" && tag.Value is string family)
|
||||
{
|
||||
families.Add(family);
|
||||
}
|
||||
}
|
||||
});
|
||||
listener.Start();
|
||||
|
||||
// Simulate the cross-talk: another test's GatewayMetrics records a value
|
||||
// before the test-under-test does its single event publish. The listener
|
||||
// must filter this out by Meter reference.
|
||||
otherMetrics.RecordEventStreamSend(MxEventFamily.OnWriteComplete.ToString(), TimeSpan.FromMilliseconds(123));
|
||||
|
||||
FakeSessionManager sessionManager = new();
|
||||
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2));
|
||||
MxAccessGatewayService service = CreateService(sessionManager, metrics: metricsUnderTest);
|
||||
RecordingServerStreamWriter<MxEvent> writer = new();
|
||||
|
||||
await service.StreamEvents(
|
||||
new StreamEventsRequest { SessionId = "session-1" },
|
||||
writer,
|
||||
new TestServerCallContext());
|
||||
|
||||
// Only the test-under-test's OnDataChange recording should be observed —
|
||||
// the OnAlarm recording on the sibling meter must NOT leak through.
|
||||
Assert.Equal([MxEventFamily.OnDataChange.ToString()], families);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that CloseSession throws InvalidArgument when session ID is blank.</summary>
|
||||
[Fact]
|
||||
public async Task CloseSession_WithBlankSessionId_ThrowsInvalidArgument()
|
||||
|
||||
@@ -466,7 +466,12 @@ public sealed class SessionManagerTests
|
||||
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that killing a worker removes the session from the registry without calling shutdown.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that killing a worker removes the session from the registry without calling shutdown.
|
||||
/// Tests-028: also pins the <c>reason</c> argument propagating through
|
||||
/// <c>SessionManager.KillWorkerAsync</c> → <c>session.KillWorker(reason)</c> →
|
||||
/// <c>IWorkerClient.Kill(reason)</c>.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task KillWorkerAsync_KillsWorkerAndRemovesSession()
|
||||
{
|
||||
@@ -480,12 +485,54 @@ public sealed class SessionManagerTests
|
||||
Assert.False(result.AlreadyClosed);
|
||||
Assert.Equal(SessionState.Closed, result.FinalState);
|
||||
Assert.Equal(1, workerClient.KillCount);
|
||||
Assert.Equal("test-kill", workerClient.LastKillReason);
|
||||
Assert.Equal(0, workerClient.ShutdownCount);
|
||||
Assert.False(manager.TryGetSession(session.SessionId, out _));
|
||||
Assert.Equal(1, metrics.GetSnapshot().SessionsClosed);
|
||||
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-028: <see cref="SessionManager.KillWorkerAsync"/> guards its <c>reason</c> argument with
|
||||
/// <see cref="ArgumentException.ThrowIfNullOrWhiteSpace"/>. A blank or whitespace reason must throw
|
||||
/// <see cref="ArgumentException"/> before any session lookup or worker call runs.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData("")]
|
||||
[InlineData(" ")]
|
||||
[InlineData("\t")]
|
||||
public async Task KillWorkerAsync_WithBlankReason_ThrowsArgumentException(string blankReason)
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentException>(
|
||||
async () => await manager.KillWorkerAsync(session.SessionId, blankReason, CancellationToken.None));
|
||||
|
||||
Assert.Equal(0, workerClient.KillCount);
|
||||
Assert.True(manager.TryGetSession(session.SessionId, out _));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-028: <see cref="ArgumentException.ThrowIfNullOrWhiteSpace"/> also rejects null.
|
||||
/// <see cref="Theory"/> with <see cref="InlineDataAttribute"/> cannot carry <c>null</c> for a
|
||||
/// non-nullable string parameter on .NET 10, so the null case is its own fact.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task KillWorkerAsync_WithNullReason_ThrowsArgumentNullException()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
async () => await manager.KillWorkerAsync(session.SessionId, null!, CancellationToken.None));
|
||||
|
||||
Assert.Equal(0, workerClient.KillCount);
|
||||
Assert.True(manager.TryGetSession(session.SessionId, out _));
|
||||
}
|
||||
|
||||
/// <summary>Verifies that killing the worker for an unknown session raises SessionNotFound.</summary>
|
||||
[Fact]
|
||||
public async Task KillWorkerAsync_WhenSessionMissing_ThrowsSessionNotFound()
|
||||
@@ -827,6 +874,15 @@ public sealed class SessionManagerTests
|
||||
/// <summary>Gets the number of times kill was called on the fake worker client.</summary>
|
||||
public int KillCount { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the last reason argument observed by <see cref="Kill"/>. Tests-028:
|
||||
/// pins the reason-string propagation through
|
||||
/// <c>SessionManager.KillWorkerAsync</c> → <c>session.KillWorker(reason)</c> →
|
||||
/// <c>IWorkerClient.Kill(reason)</c>. Without this, the chain could silently
|
||||
/// drop or substitute the reason argument and existing tests would still pass.
|
||||
/// </summary>
|
||||
public string? LastKillReason { get; private set; }
|
||||
|
||||
/// <summary>Gets the number of times dispose was called on the fake worker client.</summary>
|
||||
public int DisposeCount { get; private set; }
|
||||
|
||||
@@ -913,6 +969,7 @@ public sealed class SessionManagerTests
|
||||
public void Kill(string reason)
|
||||
{
|
||||
KillCount++;
|
||||
LastKillReason = reason;
|
||||
if (KillException is not null)
|
||||
{
|
||||
throw KillException;
|
||||
|
||||
Reference in New Issue
Block a user