a0203503a7
Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).
Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
GatewayGrpcScopeResolver so non-admin keys can use them; document
the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
in generated tonic code by reformatting the ReadBulkCommand proto
comment and scoping a #![allow(...)] to the generated submodules.
Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
make DisposeAsync race-safe against in-flight CloseAsync (-016);
add constraint-enforcement test coverage for the bulk-plan path
(-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
can distinguish graceful shutdown from a real STA-affinity
violation (-016); have the watchdog skip StaHung while
CurrentCommandCorrelationId is non-empty so a legitimate slow
ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
11 GatewaySession bulk methods (-013); replace the real TCP probe
in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
(-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
test and assert OnWriteComplete (-012); add live tests for
Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
CreateForTesting factory (-016); cover WorkerCancel and
unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
beforeStart() (-014); return a CancellingCompletableFuture that
actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
histograms with failed-call durations (-015); add coverage for
the five MalformedReply paths, the bulk-write helpers, the
Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
command family (-009).
Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
WorkerAlarmRpcDispatcher missing-session handling; drop the
duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
subscriptionExpression / ExecutingCommand arms; preserve
factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
source; switch the heartbeat-expires test to ManualTimeProvider;
add InvariantCulture to the remaining DateTimeOffset.Parse sites;
document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
IDisposable, class-level [Trait], single-source ZB default
connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
so absent env vars SKIP not pass; PascalCase rename of probe
[Fact]s; deterministic deadline test; new frame-protocol error
tests; ComputeTransitions diff-coverage; relocate dev-rig probes
to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
TreatWarningsAsErrors / analysers apply; document
DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
bulk-read handles in CLI; surface AcknowledgeAlarm transport
faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
runWriteBulkVariant; document the six new subcommands in
writeUsage; drain galaxy-watch events on limit; switch io.EOF
comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
option; regex-based credential redaction; Long.toUnsignedString
for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
_percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
_api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
stop hard-coding correlation IDs; resync RustClientDesign.md
with the current Session / Error surface and CLI subcommand set.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
422 lines
11 KiB
Go
422 lines
11 KiB
Go
package mxgateway
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/test/bufconn"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
func TestGalaxyTestConnectionAttachesAuthAndReturnsOk(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
testReply: &pb.TestConnectionReply{Ok: true},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
ok, err := client.TestConnection(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("TestConnection() error = %v", err)
|
|
}
|
|
if !ok {
|
|
t.Fatalf("TestConnection() ok = false, want true")
|
|
}
|
|
if got := fake.testAuth; got != "Bearer test-api-key" {
|
|
t.Fatalf("authorization metadata = %q, want %q", got, "Bearer test-api-key")
|
|
}
|
|
}
|
|
|
|
func TestGalaxyGetLastDeployTimeReturnsAbsentForPresentFalse(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
deployReply: &pb.GetLastDeployTimeReply{Present: false},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
got, present, err := client.GetLastDeployTime(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("GetLastDeployTime() error = %v", err)
|
|
}
|
|
if present {
|
|
t.Fatalf("present = true, want false")
|
|
}
|
|
if !got.IsZero() {
|
|
t.Fatalf("time = %v, want zero", got)
|
|
}
|
|
}
|
|
|
|
func TestGalaxyGetLastDeployTimeReturnsTimestampWhenPresent(t *testing.T) {
|
|
want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC)
|
|
fake := &fakeGalaxyServer{
|
|
deployReply: &pb.GetLastDeployTimeReply{
|
|
Present: true,
|
|
TimeOfLastDeploy: timestamppb.New(want),
|
|
},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
got, present, err := client.GetLastDeployTime(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("GetLastDeployTime() error = %v", err)
|
|
}
|
|
if !present {
|
|
t.Fatalf("present = false, want true")
|
|
}
|
|
if !got.Equal(want) {
|
|
t.Fatalf("time = %v, want %v", got, want)
|
|
}
|
|
}
|
|
|
|
func TestGalaxyGetLastDeployTimeReturnsAbsentWhenTimestampNil(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
deployReply: &pb.GetLastDeployTimeReply{Present: true, TimeOfLastDeploy: nil},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
got, present, err := client.GetLastDeployTime(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("GetLastDeployTime() error = %v", err)
|
|
}
|
|
if present {
|
|
t.Fatalf("present = true, want false (nil timestamp)")
|
|
}
|
|
if !got.IsZero() {
|
|
t.Fatalf("time = %v, want zero", got)
|
|
}
|
|
}
|
|
|
|
func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
discoverReply: &pb.DiscoverHierarchyReply{
|
|
Objects: []*pb.GalaxyObject{
|
|
{
|
|
GobjectId: 1,
|
|
TagName: "TestMachine_001",
|
|
ContainedName: "TestMachine_001",
|
|
BrowseName: "TestMachine_001",
|
|
IsArea: false,
|
|
CategoryId: 7,
|
|
TemplateChain: []string{"$Object", "$AppObject"},
|
|
Attributes: []*pb.GalaxyAttribute{
|
|
{
|
|
AttributeName: "DownloadPath",
|
|
FullTagReference: "TestMachine_001.DownloadPath",
|
|
MxDataType: 8,
|
|
DataTypeName: "String",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
GobjectId: 2,
|
|
TagName: "TestMachine_002",
|
|
ContainedName: "TestMachine_002",
|
|
ParentGobjectId: 1,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
objects, err := client.DiscoverHierarchy(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("DiscoverHierarchy() error = %v", err)
|
|
}
|
|
if len(objects) != 2 {
|
|
t.Fatalf("len(objects) = %d, want 2", len(objects))
|
|
}
|
|
if objects[0].GetTagName() != "TestMachine_001" {
|
|
t.Fatalf("objects[0].TagName = %q", objects[0].GetTagName())
|
|
}
|
|
if len(objects[0].GetAttributes()) != 1 {
|
|
t.Fatalf("len(attributes) = %d, want 1", len(objects[0].GetAttributes()))
|
|
}
|
|
if objects[0].GetAttributes()[0].GetFullTagReference() != "TestMachine_001.DownloadPath" {
|
|
t.Fatalf("FullTagReference = %q", objects[0].GetAttributes()[0].GetFullTagReference())
|
|
}
|
|
}
|
|
|
|
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
|
|
fake := &fakeGalaxyServer{failTest: true}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
_, err := client.TestConnection(context.Background())
|
|
if err == nil {
|
|
t.Fatal("TestConnection() error = nil, want error")
|
|
}
|
|
var gwErr *GatewayError
|
|
if !errors.As(err, &gwErr) {
|
|
t.Fatalf("error %T does not support errors.As(*GatewayError)", err)
|
|
}
|
|
if gwErr.Op != "galaxy test connection" {
|
|
t.Fatalf("Op = %q, want %q", gwErr.Op, "galaxy test connection")
|
|
}
|
|
}
|
|
|
|
func TestGalaxyWatchDeployEventsReceivesEventsInOrder(t *testing.T) {
|
|
bootstrap := time.Date(2026, 4, 28, 10, 0, 0, 0, time.UTC)
|
|
deploy1 := time.Date(2026, 4, 28, 10, 5, 0, 0, time.UTC)
|
|
deploy2 := time.Date(2026, 4, 28, 10, 6, 0, 0, time.UTC)
|
|
fake := &fakeGalaxyServer{
|
|
watchEvents: []*pb.DeployEvent{
|
|
{
|
|
Sequence: 1,
|
|
ObservedAt: timestamppb.New(bootstrap),
|
|
TimeOfLastDeploy: timestamppb.New(deploy1),
|
|
TimeOfLastDeployPresent: true,
|
|
ObjectCount: 10,
|
|
AttributeCount: 42,
|
|
},
|
|
{
|
|
Sequence: 2,
|
|
ObservedAt: timestamppb.New(deploy2),
|
|
TimeOfLastDeploy: timestamppb.New(deploy2),
|
|
TimeOfLastDeployPresent: true,
|
|
ObjectCount: 11,
|
|
AttributeCount: 44,
|
|
},
|
|
},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
events, errs, err := client.WatchDeployEvents(ctx, nil)
|
|
if err != nil {
|
|
t.Fatalf("WatchDeployEvents() error = %v", err)
|
|
}
|
|
|
|
got := make([]*DeployEvent, 0, 2)
|
|
loop:
|
|
for {
|
|
select {
|
|
case ev, ok := <-events:
|
|
if !ok {
|
|
break loop
|
|
}
|
|
got = append(got, ev)
|
|
case errVal := <-errs:
|
|
if errVal != nil {
|
|
t.Fatalf("error channel: %v", errVal)
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timeout waiting for events; got %d", len(got))
|
|
}
|
|
}
|
|
|
|
if len(got) != 2 {
|
|
t.Fatalf("len(events) = %d, want 2", len(got))
|
|
}
|
|
if got[0].GetSequence() != 1 || got[1].GetSequence() != 2 {
|
|
t.Fatalf("sequences = [%d,%d], want [1,2]", got[0].GetSequence(), got[1].GetSequence())
|
|
}
|
|
if !got[0].GetTimeOfLastDeployPresent() {
|
|
t.Fatalf("event[0] TimeOfLastDeployPresent = false, want true")
|
|
}
|
|
if got[0].GetObjectCount() != 10 || got[0].GetAttributeCount() != 42 {
|
|
t.Fatalf("event[0] counts = (%d,%d), want (10,42)", got[0].GetObjectCount(), got[0].GetAttributeCount())
|
|
}
|
|
if !got[0].GetTimeOfLastDeploy().AsTime().Equal(deploy1) {
|
|
t.Fatalf("event[0] TimeOfLastDeploy = %v, want %v", got[0].GetTimeOfLastDeploy().AsTime(), deploy1)
|
|
}
|
|
}
|
|
|
|
func TestGalaxyWatchDeployEventsForwardsLastSeenDeployTime(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
watchEvents: []*pb.DeployEvent{
|
|
{Sequence: 7},
|
|
},
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
lastSeen := time.Date(2026, 4, 28, 9, 0, 0, 0, time.UTC)
|
|
events, errs, err := client.WatchDeployEvents(ctx, &lastSeen)
|
|
if err != nil {
|
|
t.Fatalf("WatchDeployEvents() error = %v", err)
|
|
}
|
|
|
|
// Drain everything.
|
|
loop:
|
|
for {
|
|
select {
|
|
case _, ok := <-events:
|
|
if !ok {
|
|
break loop
|
|
}
|
|
case errVal := <-errs:
|
|
if errVal != nil {
|
|
t.Fatalf("error channel: %v", errVal)
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timeout draining events")
|
|
}
|
|
}
|
|
|
|
if fake.watchRequest == nil {
|
|
t.Fatalf("server did not receive a request")
|
|
}
|
|
gotTs := fake.watchRequest.GetLastSeenDeployTime()
|
|
if gotTs == nil {
|
|
t.Fatalf("LastSeenDeployTime = nil, want %v", lastSeen)
|
|
}
|
|
if !gotTs.AsTime().Equal(lastSeen) {
|
|
t.Fatalf("LastSeenDeployTime = %v, want %v", gotTs.AsTime(), lastSeen)
|
|
}
|
|
}
|
|
|
|
func TestGalaxyWatchDeployEventsCancelTearsDownStream(t *testing.T) {
|
|
fake := &fakeGalaxyServer{
|
|
watchEvents: []*pb.DeployEvent{
|
|
{Sequence: 1},
|
|
},
|
|
watchHoldOpen: true,
|
|
}
|
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
|
defer cleanup()
|
|
|
|
streamCtx, cancelStream := context.WithCancel(context.Background())
|
|
|
|
events, errs, err := client.WatchDeployEvents(streamCtx, nil)
|
|
if err != nil {
|
|
t.Fatalf("WatchDeployEvents() error = %v", err)
|
|
}
|
|
|
|
// Wait for the bootstrap event to arrive.
|
|
select {
|
|
case ev, ok := <-events:
|
|
if !ok {
|
|
t.Fatalf("events channel closed before delivering bootstrap")
|
|
}
|
|
if ev.GetSequence() != 1 {
|
|
t.Fatalf("got seq=%d, want 1", ev.GetSequence())
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("timeout waiting for bootstrap event")
|
|
}
|
|
|
|
// Cancel the stream; both channels must close cleanly without delivering an error.
|
|
cancelStream()
|
|
|
|
deadline := time.After(2 * time.Second)
|
|
for events != nil || errs != nil {
|
|
select {
|
|
case _, ok := <-events:
|
|
if !ok {
|
|
events = nil
|
|
}
|
|
case errVal, ok := <-errs:
|
|
if !ok {
|
|
errs = nil
|
|
continue
|
|
}
|
|
if errVal != nil {
|
|
t.Fatalf("error after cancel: %v", errVal)
|
|
}
|
|
case <-deadline:
|
|
t.Fatalf("channels did not close after cancel; events nil=%v errs nil=%v", events == nil, errs == nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient, func()) {
|
|
t.Helper()
|
|
|
|
listener := bufconn.Listen(bufSize)
|
|
server := grpc.NewServer()
|
|
pb.RegisterGalaxyRepositoryServer(server, fake)
|
|
go func() {
|
|
if err := server.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
|
|
t.Errorf("bufconn server failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
dialer := func(ctx context.Context, _ string) (net.Conn, error) {
|
|
return listener.DialContext(ctx)
|
|
}
|
|
// grpc.NewClient defaults to the dns scheme; use passthrough so the
|
|
// bufconn fake target reaches the context dialer unresolved.
|
|
client, err := DialGalaxy(context.Background(), Options{
|
|
Endpoint: "passthrough:///bufnet",
|
|
APIKey: "test-api-key",
|
|
Plaintext: true,
|
|
DialOptions: []grpc.DialOption{
|
|
grpc.WithContextDialer(dialer),
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("DialGalaxy() error = %v", err)
|
|
}
|
|
|
|
return client, func() {
|
|
client.Close()
|
|
server.Stop()
|
|
listener.Close()
|
|
}
|
|
}
|
|
|
|
type fakeGalaxyServer struct {
|
|
pb.UnimplementedGalaxyRepositoryServer
|
|
|
|
testReply *pb.TestConnectionReply
|
|
testAuth string
|
|
failTest bool
|
|
deployReply *pb.GetLastDeployTimeReply
|
|
discoverReply *pb.DiscoverHierarchyReply
|
|
watchEvents []*pb.DeployEvent
|
|
watchRequest *pb.WatchDeployEventsRequest
|
|
watchHoldOpen bool
|
|
}
|
|
|
|
func (s *fakeGalaxyServer) TestConnection(ctx context.Context, req *pb.TestConnectionRequest) (*pb.TestConnectionReply, error) {
|
|
s.testAuth = authorizationFromContext(ctx)
|
|
if s.failTest {
|
|
return nil, errors.New("simulated failure")
|
|
}
|
|
if s.testReply != nil {
|
|
return s.testReply, nil
|
|
}
|
|
return &pb.TestConnectionReply{Ok: true}, nil
|
|
}
|
|
|
|
func (s *fakeGalaxyServer) GetLastDeployTime(ctx context.Context, req *pb.GetLastDeployTimeRequest) (*pb.GetLastDeployTimeReply, error) {
|
|
if s.deployReply != nil {
|
|
return s.deployReply, nil
|
|
}
|
|
return &pb.GetLastDeployTimeReply{Present: false}, nil
|
|
}
|
|
|
|
func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) {
|
|
if s.discoverReply != nil {
|
|
return s.discoverReply, nil
|
|
}
|
|
return &pb.DiscoverHierarchyReply{}, nil
|
|
}
|
|
|
|
func (s *fakeGalaxyServer) WatchDeployEvents(req *pb.WatchDeployEventsRequest, stream grpc.ServerStreamingServer[pb.DeployEvent]) error {
|
|
s.watchRequest = req
|
|
for _, event := range s.watchEvents {
|
|
if err := stream.Send(event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if s.watchHoldOpen {
|
|
<-stream.Context().Done()
|
|
}
|
|
return nil
|
|
}
|