package mxgateway import ( "context" "errors" "net" "testing" "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" "google.golang.org/protobuf/types/known/timestamppb" ) func TestGalaxyTestConnectionAttachesAuthAndReturnsOk(t *testing.T) { fake := &fakeGalaxyServer{ testReply: &pb.TestConnectionReply{Ok: true}, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() ok, err := client.TestConnection(context.Background()) if err != nil { t.Fatalf("TestConnection() error = %v", err) } if !ok { t.Fatalf("TestConnection() ok = false, want true") } if got := fake.testAuth; got != "Bearer test-api-key" { t.Fatalf("authorization metadata = %q, want %q", got, "Bearer test-api-key") } } func TestGalaxyGetLastDeployTimeReturnsAbsentForPresentFalse(t *testing.T) { fake := &fakeGalaxyServer{ deployReply: &pb.GetLastDeployTimeReply{Present: false}, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() got, present, err := client.GetLastDeployTime(context.Background()) if err != nil { t.Fatalf("GetLastDeployTime() error = %v", err) } if present { t.Fatalf("present = true, want false") } if !got.IsZero() { t.Fatalf("time = %v, want zero", got) } } func TestGalaxyGetLastDeployTimeReturnsTimestampWhenPresent(t *testing.T) { want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC) fake := &fakeGalaxyServer{ deployReply: &pb.GetLastDeployTimeReply{ Present: true, TimeOfLastDeploy: timestamppb.New(want), }, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() got, present, err := client.GetLastDeployTime(context.Background()) if err != nil { t.Fatalf("GetLastDeployTime() error = %v", err) } if !present { t.Fatalf("present = false, want true") } if !got.Equal(want) { t.Fatalf("time = %v, want %v", got, want) } } func TestGalaxyGetLastDeployTimeReturnsAbsentWhenTimestampNil(t *testing.T) { fake := &fakeGalaxyServer{ deployReply: &pb.GetLastDeployTimeReply{Present: true, TimeOfLastDeploy: nil}, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() got, present, err := client.GetLastDeployTime(context.Background()) if err != nil { t.Fatalf("GetLastDeployTime() error = %v", err) } if present { t.Fatalf("present = true, want false (nil timestamp)") } if !got.IsZero() { t.Fatalf("time = %v, want zero", got) } } func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) { fake := &fakeGalaxyServer{ discoverReply: &pb.DiscoverHierarchyReply{ Objects: []*pb.GalaxyObject{ { GobjectId: 1, TagName: "TestMachine_001", ContainedName: "TestMachine_001", BrowseName: "TestMachine_001", IsArea: false, CategoryId: 7, TemplateChain: []string{"$Object", "$AppObject"}, Attributes: []*pb.GalaxyAttribute{ { AttributeName: "DownloadPath", FullTagReference: "TestMachine_001.DownloadPath", MxDataType: 8, DataTypeName: "String", }, }, }, { GobjectId: 2, TagName: "TestMachine_002", ContainedName: "TestMachine_002", ParentGobjectId: 1, }, }, }, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() objects, err := client.DiscoverHierarchy(context.Background()) if err != nil { t.Fatalf("DiscoverHierarchy() error = %v", err) } if len(objects) != 2 { t.Fatalf("len(objects) = %d, want 2", len(objects)) } if objects[0].GetTagName() != "TestMachine_001" { t.Fatalf("objects[0].TagName = %q", objects[0].GetTagName()) } if len(objects[0].GetAttributes()) != 1 { t.Fatalf("len(attributes) = %d, want 1", len(objects[0].GetAttributes())) } if objects[0].GetAttributes()[0].GetFullTagReference() != "TestMachine_001.DownloadPath" { t.Fatalf("FullTagReference = %q", objects[0].GetAttributes()[0].GetFullTagReference()) } } func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) { fake := &fakeGalaxyServer{failTest: true} client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() _, err := client.TestConnection(context.Background()) if err == nil { t.Fatal("TestConnection() error = nil, want error") } var gwErr *GatewayError if !errors.As(err, &gwErr) { t.Fatalf("error %T does not support errors.As(*GatewayError)", err) } if gwErr.Op != "galaxy test connection" { t.Fatalf("Op = %q, want %q", gwErr.Op, "galaxy test connection") } } func TestGalaxyWatchDeployEventsReceivesEventsInOrder(t *testing.T) { bootstrap := time.Date(2026, 4, 28, 10, 0, 0, 0, time.UTC) deploy1 := time.Date(2026, 4, 28, 10, 5, 0, 0, time.UTC) deploy2 := time.Date(2026, 4, 28, 10, 6, 0, 0, time.UTC) fake := &fakeGalaxyServer{ watchEvents: []*pb.DeployEvent{ { Sequence: 1, ObservedAt: timestamppb.New(bootstrap), TimeOfLastDeploy: timestamppb.New(deploy1), TimeOfLastDeployPresent: true, ObjectCount: 10, AttributeCount: 42, }, { Sequence: 2, ObservedAt: timestamppb.New(deploy2), TimeOfLastDeploy: timestamppb.New(deploy2), TimeOfLastDeployPresent: true, ObjectCount: 11, AttributeCount: 44, }, }, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() events, errs, err := client.WatchDeployEvents(ctx, nil) if err != nil { t.Fatalf("WatchDeployEvents() error = %v", err) } got := make([]*DeployEvent, 0, 2) loop: for { select { case ev, ok := <-events: if !ok { break loop } got = append(got, ev) case errVal := <-errs: if errVal != nil { t.Fatalf("error channel: %v", errVal) } case <-ctx.Done(): t.Fatalf("timeout waiting for events; got %d", len(got)) } } if len(got) != 2 { t.Fatalf("len(events) = %d, want 2", len(got)) } if got[0].GetSequence() != 1 || got[1].GetSequence() != 2 { t.Fatalf("sequences = [%d,%d], want [1,2]", got[0].GetSequence(), got[1].GetSequence()) } if !got[0].GetTimeOfLastDeployPresent() { t.Fatalf("event[0] TimeOfLastDeployPresent = false, want true") } if got[0].GetObjectCount() != 10 || got[0].GetAttributeCount() != 42 { t.Fatalf("event[0] counts = (%d,%d), want (10,42)", got[0].GetObjectCount(), got[0].GetAttributeCount()) } if !got[0].GetTimeOfLastDeploy().AsTime().Equal(deploy1) { t.Fatalf("event[0] TimeOfLastDeploy = %v, want %v", got[0].GetTimeOfLastDeploy().AsTime(), deploy1) } } func TestGalaxyWatchDeployEventsForwardsLastSeenDeployTime(t *testing.T) { fake := &fakeGalaxyServer{ watchEvents: []*pb.DeployEvent{ {Sequence: 7}, }, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() lastSeen := time.Date(2026, 4, 28, 9, 0, 0, 0, time.UTC) events, errs, err := client.WatchDeployEvents(ctx, &lastSeen) if err != nil { t.Fatalf("WatchDeployEvents() error = %v", err) } // Drain everything. loop: for { select { case _, ok := <-events: if !ok { break loop } case errVal := <-errs: if errVal != nil { t.Fatalf("error channel: %v", errVal) } case <-ctx.Done(): t.Fatalf("timeout draining events") } } if fake.watchRequest == nil { t.Fatalf("server did not receive a request") } gotTs := fake.watchRequest.GetLastSeenDeployTime() if gotTs == nil { t.Fatalf("LastSeenDeployTime = nil, want %v", lastSeen) } if !gotTs.AsTime().Equal(lastSeen) { t.Fatalf("LastSeenDeployTime = %v, want %v", gotTs.AsTime(), lastSeen) } } func TestGalaxyWatchDeployEventsCancelTearsDownStream(t *testing.T) { fake := &fakeGalaxyServer{ watchEvents: []*pb.DeployEvent{ {Sequence: 1}, }, watchHoldOpen: true, } client, cleanup := newGalaxyBufconnClient(t, fake) defer cleanup() streamCtx, cancelStream := context.WithCancel(context.Background()) events, errs, err := client.WatchDeployEvents(streamCtx, nil) if err != nil { t.Fatalf("WatchDeployEvents() error = %v", err) } // Wait for the bootstrap event to arrive. select { case ev, ok := <-events: if !ok { t.Fatalf("events channel closed before delivering bootstrap") } if ev.GetSequence() != 1 { t.Fatalf("got seq=%d, want 1", ev.GetSequence()) } case <-time.After(2 * time.Second): t.Fatalf("timeout waiting for bootstrap event") } // Cancel the stream; both channels must close cleanly without delivering an error. cancelStream() deadline := time.After(2 * time.Second) for events != nil || errs != nil { select { case _, ok := <-events: if !ok { events = nil } case errVal, ok := <-errs: if !ok { errs = nil continue } if errVal != nil { t.Fatalf("error after cancel: %v", errVal) } case <-deadline: t.Fatalf("channels did not close after cancel; events nil=%v errs nil=%v", events == nil, errs == nil) } } } func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient, func()) { t.Helper() listener := bufconn.Listen(bufSize) server := grpc.NewServer() pb.RegisterGalaxyRepositoryServer(server, fake) go func() { if err := server.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { t.Errorf("bufconn server failed: %v", err) } }() dialer := func(ctx context.Context, _ string) (net.Conn, error) { return listener.DialContext(ctx) } // grpc.NewClient defaults to the dns scheme; use passthrough so the // bufconn fake target reaches the context dialer unresolved. client, err := DialGalaxy(context.Background(), Options{ Endpoint: "passthrough:///bufnet", APIKey: "test-api-key", Plaintext: true, DialOptions: []grpc.DialOption{ grpc.WithContextDialer(dialer), }, }) if err != nil { t.Fatalf("DialGalaxy() error = %v", err) } return client, func() { client.Close() server.Stop() listener.Close() } } type fakeGalaxyServer struct { pb.UnimplementedGalaxyRepositoryServer testReply *pb.TestConnectionReply testAuth string failTest bool deployReply *pb.GetLastDeployTimeReply discoverReply *pb.DiscoverHierarchyReply watchEvents []*pb.DeployEvent watchRequest *pb.WatchDeployEventsRequest watchHoldOpen bool } func (s *fakeGalaxyServer) TestConnection(ctx context.Context, req *pb.TestConnectionRequest) (*pb.TestConnectionReply, error) { s.testAuth = authorizationFromContext(ctx) if s.failTest { return nil, errors.New("simulated failure") } if s.testReply != nil { return s.testReply, nil } return &pb.TestConnectionReply{Ok: true}, nil } func (s *fakeGalaxyServer) GetLastDeployTime(ctx context.Context, req *pb.GetLastDeployTimeRequest) (*pb.GetLastDeployTimeReply, error) { if s.deployReply != nil { return s.deployReply, nil } return &pb.GetLastDeployTimeReply{Present: false}, nil } func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) { if s.discoverReply != nil { return s.discoverReply, nil } return &pb.DiscoverHierarchyReply{}, nil } func (s *fakeGalaxyServer) WatchDeployEvents(req *pb.WatchDeployEventsRequest, stream grpc.ServerStreamingServer[pb.DeployEvent]) error { s.watchRequest = req for _, event := range s.watchEvents { if err := stream.Send(event); err != nil { return err } } if s.watchHoldOpen { <-stream.Context().Done() } return nil }