Merge pull request 'clients/go: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.4)' (#108) from track-e4-go-alarm-sdk into main
This commit was merged in pull request #108.
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
package mxgateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
// AcknowledgeAlarm acknowledges an active MXAccess alarm condition through the
|
||||
// gateway. The gateway authenticates the request against the API key's
|
||||
// invoke:alarm-ack scope and forwards the acknowledge to the worker's MXAccess
|
||||
// session; the resulting native MxStatus is returned in the reply.
|
||||
//
|
||||
// Acks are idempotent — re-acking an already-acked condition is a no-op at
|
||||
// the MxAccess layer.
|
||||
func (c *Client) AcknowledgeAlarm(ctx context.Context, req *AcknowledgeAlarmRequest) (*AcknowledgeAlarmReply, error) {
|
||||
if req == nil {
|
||||
return nil, errors.New("mxgateway: acknowledge alarm request is required")
|
||||
}
|
||||
|
||||
callCtx, cancel := c.callContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
reply, err := c.raw.AcknowledgeAlarm(callCtx, req)
|
||||
if err != nil {
|
||||
return nil, &GatewayError{Op: "acknowledge alarm", Err: err}
|
||||
}
|
||||
if err := EnsureProtocolSuccess("acknowledge alarm", reply.GetProtocolStatus(), nil); err != nil {
|
||||
return reply, err
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
// QueryActiveAlarms streams a snapshot of all alarms currently Active or
|
||||
// ActiveAcked — the gateway's ConditionRefresh equivalent. Used after reconnect
|
||||
// to seed local Part 9 state, or to reconcile alarms that may have been missed
|
||||
// during a transport blip.
|
||||
//
|
||||
// The returned stream is owned by the caller; cancel ctx to release it.
|
||||
// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the
|
||||
// stream to a sub-tree.
|
||||
func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRequest) (QueryActiveAlarmsClient, error) {
|
||||
if req == nil {
|
||||
return nil, errors.New("mxgateway: query active alarms request is required")
|
||||
}
|
||||
|
||||
stream, err := c.raw.QueryActiveAlarms(ctx, req)
|
||||
if err != nil {
|
||||
return nil, &GatewayError{Op: "query active alarms", Err: err}
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
@@ -0,0 +1,238 @@
|
||||
package mxgateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
)
|
||||
|
||||
// PR E.4 — pins the Go SDK surface for the new alarm RPCs:
|
||||
// AcknowledgeAlarm + QueryActiveAlarms.
|
||||
|
||||
func TestAcknowledgeAlarmSendsRequestAndReturnsReply(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
acknowledgeReply: &pb.AcknowledgeAlarmReply{
|
||||
SessionId: "session-1",
|
||||
CorrelationId: "corr-1",
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
Status: &pb.MxStatusProxy{
|
||||
Success: 1,
|
||||
Category: pb.MxStatusCategory_MX_STATUS_CATEGORY_OK,
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
reply, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{
|
||||
SessionId: "session-1",
|
||||
ClientCorrelationId: "corr-1",
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
Comment: "investigating",
|
||||
OperatorUser: "alice",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("AcknowledgeAlarm() error = %v", err)
|
||||
}
|
||||
if reply.GetProtocolStatus().GetCode() != pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK {
|
||||
t.Fatalf("protocol status = %v", reply.GetProtocolStatus().GetCode())
|
||||
}
|
||||
if got := fake.acknowledgeRequest.GetAlarmFullReference(); got != "Tank01.Level.HiHi" {
|
||||
t.Fatalf("captured alarm reference = %q", got)
|
||||
}
|
||||
if got := fake.acknowledgeRequest.GetComment(); got != "investigating" {
|
||||
t.Fatalf("captured comment = %q", got)
|
||||
}
|
||||
if got := fake.acknowledgeAuth; got != "Bearer test-api-key" {
|
||||
t.Fatalf("authorization metadata = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAcknowledgeAlarmRejectsNilRequest(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
_, err := client.AcknowledgeAlarm(context.Background(), nil)
|
||||
if err == nil || !errors.Is(err, errors.Unwrap(err)) && err.Error() != "mxgateway: acknowledge alarm request is required" {
|
||||
// Accept either: the helper returned the literal sentinel, or the
|
||||
// generic transport error — both prove nil was rejected.
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatalf("AcknowledgeAlarm(nil) returned no error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAcknowledgeAlarmMapsUnauthenticated(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
acknowledgeError: status.Error(codes.Unauthenticated, "expired key"),
|
||||
}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
_, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{
|
||||
SessionId: "session-1",
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
OperatorUser: "alice",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("AcknowledgeAlarm() returned no error on Unauthenticated")
|
||||
}
|
||||
var gwErr *GatewayError
|
||||
if !errors.As(err, &gwErr) {
|
||||
t.Fatalf("error %T does not unwrap to *GatewayError", err)
|
||||
}
|
||||
if got, _ := status.FromError(gwErr.Err); got.Code() != codes.Unauthenticated {
|
||||
t.Fatalf("inner status code = %v", got.Code())
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryActiveAlarmsStreamsSnapshots(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
activeSnapshots: []*pb.ActiveAlarmSnapshot{
|
||||
{
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE,
|
||||
Severity: 750,
|
||||
},
|
||||
{
|
||||
AlarmFullReference: "Tank02.Level.HiHi",
|
||||
CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED,
|
||||
Severity: 750,
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
||||
SessionId: "session-1",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
||||
}
|
||||
|
||||
var received []*pb.ActiveAlarmSnapshot
|
||||
for {
|
||||
snap, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("stream.Recv() error = %v", err)
|
||||
}
|
||||
received = append(received, snap)
|
||||
}
|
||||
if len(received) != 2 {
|
||||
t.Fatalf("snapshot count = %d, want 2", len(received))
|
||||
}
|
||||
if received[0].GetAlarmFullReference() != "Tank01.Level.HiHi" {
|
||||
t.Fatalf("snapshot[0] ref = %q", received[0].GetAlarmFullReference())
|
||||
}
|
||||
if received[1].GetCurrentState() != pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED {
|
||||
t.Fatalf("snapshot[1] state = %v", received[1].GetCurrentState())
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
||||
SessionId: "session-1",
|
||||
AlarmFilterPrefix: "Tank01.",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
||||
}
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("stream.Recv() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if got := fake.queryRequest.GetAlarmFilterPrefix(); got != "Tank01." {
|
||||
t.Fatalf("captured filter prefix = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeGatewayWithAlarms struct {
|
||||
pb.UnimplementedMxAccessGatewayServer
|
||||
|
||||
acknowledgeRequest *pb.AcknowledgeAlarmRequest
|
||||
acknowledgeReply *pb.AcknowledgeAlarmReply
|
||||
acknowledgeError error
|
||||
acknowledgeAuth string
|
||||
|
||||
queryRequest *pb.QueryActiveAlarmsRequest
|
||||
activeSnapshots []*pb.ActiveAlarmSnapshot
|
||||
}
|
||||
|
||||
func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.AcknowledgeAlarmRequest) (*pb.AcknowledgeAlarmReply, error) {
|
||||
s.acknowledgeRequest = req
|
||||
s.acknowledgeAuth = authorizationFromContext(ctx)
|
||||
if s.acknowledgeError != nil {
|
||||
return nil, s.acknowledgeError
|
||||
}
|
||||
if s.acknowledgeReply != nil {
|
||||
return s.acknowledgeReply, nil
|
||||
}
|
||||
return &pb.AcknowledgeAlarmReply{
|
||||
SessionId: req.GetSessionId(),
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *fakeGatewayWithAlarms) QueryActiveAlarms(req *pb.QueryActiveAlarmsRequest, stream grpc.ServerStreamingServer[pb.ActiveAlarmSnapshot]) error {
|
||||
s.queryRequest = req
|
||||
for _, snap := range s.activeSnapshots {
|
||||
if err := stream.Send(snap); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) {
|
||||
t.Helper()
|
||||
listener := bufconn.Listen(bufSize)
|
||||
server := grpc.NewServer()
|
||||
pb.RegisterMxAccessGatewayServer(server, fake)
|
||||
go func() {
|
||||
_ = server.Serve(listener)
|
||||
}()
|
||||
dialer := func(ctx context.Context, _ string) (net.Conn, error) {
|
||||
return listener.DialContext(ctx)
|
||||
}
|
||||
client, err := Dial(context.Background(), Options{
|
||||
Endpoint: "bufnet",
|
||||
APIKey: "test-api-key",
|
||||
Plaintext: true,
|
||||
DialOptions: []grpc.DialOption{grpc.WithContextDialer(dialer)},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Dial() error = %v", err)
|
||||
}
|
||||
return client, func() {
|
||||
client.Close()
|
||||
server.Stop()
|
||||
listener.Close()
|
||||
}
|
||||
}
|
||||
@@ -80,8 +80,30 @@ type (
|
||||
SubscribeResult = pb.SubscribeResult
|
||||
// BulkSubscribeReply aggregates SubscribeResult entries for a bulk command.
|
||||
BulkSubscribeReply = pb.BulkSubscribeReply
|
||||
// AcknowledgeAlarmRequest is the gateway AcknowledgeAlarm request message.
|
||||
AcknowledgeAlarmRequest = pb.AcknowledgeAlarmRequest
|
||||
// AcknowledgeAlarmReply is the gateway AcknowledgeAlarm reply message.
|
||||
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
||||
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
||||
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
||||
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
||||
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
||||
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
||||
OnAlarmTransitionEvent = pb.OnAlarmTransitionEvent
|
||||
)
|
||||
|
||||
// AlarmTransitionKind discriminates raise / acknowledge / clear / retrigger
|
||||
// transitions on an OnAlarmTransitionEvent.
|
||||
type AlarmTransitionKind = pb.AlarmTransitionKind
|
||||
|
||||
// AlarmConditionState reports the current state of an active alarm in a
|
||||
// ConditionRefresh snapshot.
|
||||
type AlarmConditionState = pb.AlarmConditionState
|
||||
|
||||
// QueryActiveAlarmsClient is the generated server-streaming client for the
|
||||
// QueryActiveAlarms RPC.
|
||||
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
||||
|
||||
// Enumerations from the generated contract re-exported for client callers.
|
||||
type (
|
||||
// MxCommandKind discriminates which MXAccess command an MxCommand carries.
|
||||
|
||||
@@ -7,7 +7,7 @@ const (
|
||||
|
||||
// GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion
|
||||
// in the shared .NET contracts.
|
||||
GatewayProtocolVersion uint32 = 1
|
||||
GatewayProtocolVersion uint32 = 3
|
||||
|
||||
// WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion
|
||||
// and is exposed for fake-worker and parity tests.
|
||||
|
||||
Reference in New Issue
Block a user