From b4016e738c93913fb027b2908925ace6759e6949 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 30 Apr 2026 16:54:22 -0400 Subject: [PATCH] clients/go: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ninth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Go SDK. Depends on PR E.1 (regen, merged). - Client.AcknowledgeAlarm — context-aware unary call routed through the existing callContext helper (default 30s timeout). Failures wrap into *GatewayError; protocol-status non-OK promotes to typed protocol errors via EnsureProtocolSuccess. - Client.QueryActiveAlarms — context-streaming wrapper around the generated MxAccessGateway_QueryActiveAlarmsClient. Caller drives the stream via Recv(); cancelling ctx releases it. - types.go re-exports the four new generated types (AcknowledgeAlarmRequest/Reply, QueryActiveAlarmsRequest, ActiveAlarmSnapshot) plus the AlarmTransitionKind / AlarmConditionState enums and the QueryActiveAlarmsClient stream alias. - version.go bumps GatewayProtocolVersion 1 → 3 to match the .NET contract; the const was previously stale and the bump fixes the pre-existing TestOpenSessionFixtureProtocolVersions failure that was masked because the fixture had not been regenerated until A.1. Tests: - 4 new tests in alarms_test.go — request shape + auth metadata, nil-request rejection, Unauthenticated mapping, snapshot streaming over bufconn, filter-prefix passthrough. - All Go test suites green: cmd/mxgw-go + mxgateway. Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/go/mxgateway/alarms.go | 53 +++++++ clients/go/mxgateway/alarms_test.go | 238 ++++++++++++++++++++++++++++ clients/go/mxgateway/types.go | 22 +++ clients/go/mxgateway/version.go | 2 +- 4 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 clients/go/mxgateway/alarms.go create mode 100644 clients/go/mxgateway/alarms_test.go diff --git a/clients/go/mxgateway/alarms.go b/clients/go/mxgateway/alarms.go new file mode 100644 index 0000000..9fbd911 --- /dev/null +++ b/clients/go/mxgateway/alarms.go @@ -0,0 +1,53 @@ +package mxgateway + +import ( + "context" + "errors" +) + +// AcknowledgeAlarm acknowledges an active MXAccess alarm condition through the +// gateway. The gateway authenticates the request against the API key's +// invoke:alarm-ack scope and forwards the acknowledge to the worker's MXAccess +// session; the resulting native MxStatus is returned in the reply. +// +// Acks are idempotent — re-acking an already-acked condition is a no-op at +// the MxAccess layer. +func (c *Client) AcknowledgeAlarm(ctx context.Context, req *AcknowledgeAlarmRequest) (*AcknowledgeAlarmReply, error) { + if req == nil { + return nil, errors.New("mxgateway: acknowledge alarm request is required") + } + + callCtx, cancel := c.callContext(ctx) + defer cancel() + + reply, err := c.raw.AcknowledgeAlarm(callCtx, req) + if err != nil { + return nil, &GatewayError{Op: "acknowledge alarm", Err: err} + } + if err := EnsureProtocolSuccess("acknowledge alarm", reply.GetProtocolStatus(), nil); err != nil { + return reply, err + } + + return reply, nil +} + +// QueryActiveAlarms streams a snapshot of all alarms currently Active or +// ActiveAcked — the gateway's ConditionRefresh equivalent. Used after reconnect +// to seed local Part 9 state, or to reconcile alarms that may have been missed +// during a transport blip. +// +// The returned stream is owned by the caller; cancel ctx to release it. +// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the +// stream to a sub-tree. +func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRequest) (QueryActiveAlarmsClient, error) { + if req == nil { + return nil, errors.New("mxgateway: query active alarms request is required") + } + + stream, err := c.raw.QueryActiveAlarms(ctx, req) + if err != nil { + return nil, &GatewayError{Op: "query active alarms", Err: err} + } + + return stream, nil +} diff --git a/clients/go/mxgateway/alarms_test.go b/clients/go/mxgateway/alarms_test.go new file mode 100644 index 0000000..46b0c1f --- /dev/null +++ b/clients/go/mxgateway/alarms_test.go @@ -0,0 +1,238 @@ +package mxgateway + +import ( + "context" + "errors" + "io" + "net" + "testing" + + pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" +) + +// PR E.4 — pins the Go SDK surface for the new alarm RPCs: +// AcknowledgeAlarm + QueryActiveAlarms. + +func TestAcknowledgeAlarmSendsRequestAndReturnsReply(t *testing.T) { + fake := &fakeGatewayWithAlarms{ + acknowledgeReply: &pb.AcknowledgeAlarmReply{ + SessionId: "session-1", + CorrelationId: "corr-1", + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + Status: &pb.MxStatusProxy{ + Success: 1, + Category: pb.MxStatusCategory_MX_STATUS_CATEGORY_OK, + }, + }, + } + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + reply, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{ + SessionId: "session-1", + ClientCorrelationId: "corr-1", + AlarmFullReference: "Tank01.Level.HiHi", + Comment: "investigating", + OperatorUser: "alice", + }) + if err != nil { + t.Fatalf("AcknowledgeAlarm() error = %v", err) + } + if reply.GetProtocolStatus().GetCode() != pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK { + t.Fatalf("protocol status = %v", reply.GetProtocolStatus().GetCode()) + } + if got := fake.acknowledgeRequest.GetAlarmFullReference(); got != "Tank01.Level.HiHi" { + t.Fatalf("captured alarm reference = %q", got) + } + if got := fake.acknowledgeRequest.GetComment(); got != "investigating" { + t.Fatalf("captured comment = %q", got) + } + if got := fake.acknowledgeAuth; got != "Bearer test-api-key" { + t.Fatalf("authorization metadata = %q", got) + } +} + +func TestAcknowledgeAlarmRejectsNilRequest(t *testing.T) { + fake := &fakeGatewayWithAlarms{} + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + _, err := client.AcknowledgeAlarm(context.Background(), nil) + if err == nil || !errors.Is(err, errors.Unwrap(err)) && err.Error() != "mxgateway: acknowledge alarm request is required" { + // Accept either: the helper returned the literal sentinel, or the + // generic transport error — both prove nil was rejected. + } + if err == nil { + t.Fatalf("AcknowledgeAlarm(nil) returned no error") + } +} + +func TestAcknowledgeAlarmMapsUnauthenticated(t *testing.T) { + fake := &fakeGatewayWithAlarms{ + acknowledgeError: status.Error(codes.Unauthenticated, "expired key"), + } + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + _, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{ + SessionId: "session-1", + AlarmFullReference: "Tank01.Level.HiHi", + OperatorUser: "alice", + }) + if err == nil { + t.Fatalf("AcknowledgeAlarm() returned no error on Unauthenticated") + } + var gwErr *GatewayError + if !errors.As(err, &gwErr) { + t.Fatalf("error %T does not unwrap to *GatewayError", err) + } + if got, _ := status.FromError(gwErr.Err); got.Code() != codes.Unauthenticated { + t.Fatalf("inner status code = %v", got.Code()) + } +} + +func TestQueryActiveAlarmsStreamsSnapshots(t *testing.T) { + fake := &fakeGatewayWithAlarms{ + activeSnapshots: []*pb.ActiveAlarmSnapshot{ + { + AlarmFullReference: "Tank01.Level.HiHi", + CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE, + Severity: 750, + }, + { + AlarmFullReference: "Tank02.Level.HiHi", + CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED, + Severity: 750, + }, + }, + } + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{ + SessionId: "session-1", + }) + if err != nil { + t.Fatalf("QueryActiveAlarms() error = %v", err) + } + + var received []*pb.ActiveAlarmSnapshot + for { + snap, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("stream.Recv() error = %v", err) + } + received = append(received, snap) + } + if len(received) != 2 { + t.Fatalf("snapshot count = %d, want 2", len(received)) + } + if received[0].GetAlarmFullReference() != "Tank01.Level.HiHi" { + t.Fatalf("snapshot[0] ref = %q", received[0].GetAlarmFullReference()) + } + if received[1].GetCurrentState() != pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED { + t.Fatalf("snapshot[1] state = %v", received[1].GetCurrentState()) + } +} + +func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) { + fake := &fakeGatewayWithAlarms{} + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{ + SessionId: "session-1", + AlarmFilterPrefix: "Tank01.", + }) + if err != nil { + t.Fatalf("QueryActiveAlarms() error = %v", err) + } + for { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("stream.Recv() error = %v", err) + } + } + + if got := fake.queryRequest.GetAlarmFilterPrefix(); got != "Tank01." { + t.Fatalf("captured filter prefix = %q", got) + } +} + +type fakeGatewayWithAlarms struct { + pb.UnimplementedMxAccessGatewayServer + + acknowledgeRequest *pb.AcknowledgeAlarmRequest + acknowledgeReply *pb.AcknowledgeAlarmReply + acknowledgeError error + acknowledgeAuth string + + queryRequest *pb.QueryActiveAlarmsRequest + activeSnapshots []*pb.ActiveAlarmSnapshot +} + +func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.AcknowledgeAlarmRequest) (*pb.AcknowledgeAlarmReply, error) { + s.acknowledgeRequest = req + s.acknowledgeAuth = authorizationFromContext(ctx) + if s.acknowledgeError != nil { + return nil, s.acknowledgeError + } + if s.acknowledgeReply != nil { + return s.acknowledgeReply, nil + } + return &pb.AcknowledgeAlarmReply{ + SessionId: req.GetSessionId(), + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + }, nil +} + +func (s *fakeGatewayWithAlarms) QueryActiveAlarms(req *pb.QueryActiveAlarmsRequest, stream grpc.ServerStreamingServer[pb.ActiveAlarmSnapshot]) error { + s.queryRequest = req + for _, snap := range s.activeSnapshots { + if err := stream.Send(snap); err != nil { + return err + } + } + return nil +} + +func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) { + t.Helper() + listener := bufconn.Listen(bufSize) + server := grpc.NewServer() + pb.RegisterMxAccessGatewayServer(server, fake) + go func() { + _ = server.Serve(listener) + }() + dialer := func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + } + client, err := Dial(context.Background(), Options{ + Endpoint: "bufnet", + APIKey: "test-api-key", + Plaintext: true, + DialOptions: []grpc.DialOption{grpc.WithContextDialer(dialer)}, + }) + if err != nil { + t.Fatalf("Dial() error = %v", err) + } + return client, func() { + client.Close() + server.Stop() + listener.Close() + } +} diff --git a/clients/go/mxgateway/types.go b/clients/go/mxgateway/types.go index edba63c..b942b9e 100644 --- a/clients/go/mxgateway/types.go +++ b/clients/go/mxgateway/types.go @@ -80,8 +80,30 @@ type ( SubscribeResult = pb.SubscribeResult // BulkSubscribeReply aggregates SubscribeResult entries for a bulk command. BulkSubscribeReply = pb.BulkSubscribeReply + // AcknowledgeAlarmRequest is the gateway AcknowledgeAlarm request message. + AcknowledgeAlarmRequest = pb.AcknowledgeAlarmRequest + // AcknowledgeAlarmReply is the gateway AcknowledgeAlarm reply message. + AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply + // QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message. + QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest + // ActiveAlarmSnapshot is one row in a ConditionRefresh stream. + ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot + // OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents. + OnAlarmTransitionEvent = pb.OnAlarmTransitionEvent ) +// AlarmTransitionKind discriminates raise / acknowledge / clear / retrigger +// transitions on an OnAlarmTransitionEvent. +type AlarmTransitionKind = pb.AlarmTransitionKind + +// AlarmConditionState reports the current state of an active alarm in a +// ConditionRefresh snapshot. +type AlarmConditionState = pb.AlarmConditionState + +// QueryActiveAlarmsClient is the generated server-streaming client for the +// QueryActiveAlarms RPC. +type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient + // Enumerations from the generated contract re-exported for client callers. type ( // MxCommandKind discriminates which MXAccess command an MxCommand carries. diff --git a/clients/go/mxgateway/version.go b/clients/go/mxgateway/version.go index 66fd475..5aa56d7 100644 --- a/clients/go/mxgateway/version.go +++ b/clients/go/mxgateway/version.go @@ -7,7 +7,7 @@ const ( // GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion // in the shared .NET contracts. - GatewayProtocolVersion uint32 = 1 + GatewayProtocolVersion uint32 = 3 // WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion // and is exposed for fake-worker and parity tests. -- 2.52.0