Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty cf20142634 Fix Galaxy projection and constraint review findings 2026-04-29 14:24:11 -04:00
Joseph Doherty b995c174eb Implement Galaxy filters and API key constraints 2026-04-29 13:37:00 -04:00
Joseph Doherty ac2787f619 Fix Galaxy paging review findings 2026-04-29 11:59:49 -04:00
Joseph Doherty d543679044 Fix runtime review findings 2026-04-29 10:39:49 -04:00
112 changed files with 7614 additions and 503 deletions
@@ -814,9 +814,7 @@ public static class MxGatewayClientCli
TextWriter output,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply reply = await client.GalaxyDiscoverHierarchyAsync(
new DiscoverHierarchyRequest(),
cancellationToken)
DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken)
.ConfigureAwait(false);
if (arguments.HasFlag("json"))
@@ -834,6 +832,39 @@ public static class MxGatewayClientCli
return 0;
}
private static async Task<DiscoverHierarchyReply> DiscoverAllGalaxyHierarchyAsync(
IMxGatewayCliClient client,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply aggregate = new();
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
string pageToken = string.Empty;
do
{
DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync(
new DiscoverHierarchyRequest
{
PageSize = 5000,
PageToken = pageToken,
},
cancellationToken)
.ConfigureAwait(false);
aggregate.Objects.Add(page.Objects);
aggregate.TotalObjectCount = page.TotalObjectCount;
pageToken = page.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken)
&& !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
return aggregate;
}
private static async Task<int> GalaxyWatchAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -21,6 +21,8 @@ internal sealed class FakeGalaxyRepositoryTransport(MxGatewayClientOptions optio
public DiscoverHierarchyReply DiscoverHierarchyReply { get; set; } = new();
public Queue<DiscoverHierarchyReply> DiscoverHierarchyReplies { get; } = new();
public Queue<Exception> TestConnectionExceptions { get; } = new();
public Queue<Exception> GetLastDeployTimeExceptions { get; } = new();
@@ -63,7 +65,10 @@ internal sealed class FakeGalaxyRepositoryTransport(MxGatewayClientOptions optio
throw exception;
}
return Task.FromResult(DiscoverHierarchyReply);
return Task.FromResult(
DiscoverHierarchyReplies.TryDequeue(out DiscoverHierarchyReply? reply)
? reply
: DiscoverHierarchyReply);
}
public List<(WatchDeployEventsRequest Request, CallOptions CallOptions)> WatchDeployEventsCalls { get; } = [];
@@ -68,8 +68,10 @@ public sealed class GalaxyRepositoryClientTests
public async Task DiscoverHierarchyAsync_ReturnsObjectsFromReply()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.DiscoverHierarchyReply = new DiscoverHierarchyReply
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "page-2",
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
@@ -91,12 +93,29 @@ public sealed class GalaxyRepositoryClientTests
},
},
},
};
});
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
{
GobjectId = 13,
TagName = "DelmiaReceiver_002",
},
},
});
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<GalaxyObject> objects = await client.DiscoverHierarchyAsync();
GalaxyObject obj = Assert.Single(objects);
Assert.Equal(2, objects.Count);
Assert.Equal(2, transport.DiscoverHierarchyCalls.Count);
Assert.Equal(5000, transport.DiscoverHierarchyCalls[0].Request.PageSize);
Assert.Equal("", transport.DiscoverHierarchyCalls[0].Request.PageToken);
Assert.Equal("page-2", transport.DiscoverHierarchyCalls[1].Request.PageToken);
GalaxyObject obj = objects[0];
Assert.Equal(12, obj.GobjectId);
Assert.Equal("DelmiaReceiver_001", obj.TagName);
GalaxyAttribute attribute = Assert.Single(obj.Attributes);
@@ -121,6 +140,57 @@ public sealed class GalaxyRepositoryClientTests
Assert.False(call.CallOptions.CancellationToken.IsCancellationRequested);
}
[Fact]
public async Task DiscoverHierarchyAsync_WithRepeatedPageToken_ThrowsProtocolError()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
});
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
});
await using GalaxyRepositoryClient client = CreateClient(transport);
MxGatewayException exception = await Assert.ThrowsAsync<MxGatewayException>(
async () => await client.DiscoverHierarchyAsync());
Assert.Contains("repeated page token", exception.Message, StringComparison.Ordinal);
}
[Fact]
public async Task DiscoverHierarchyAsync_WithOptions_MapsTypedFilters()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
await using GalaxyRepositoryClient client = CreateClient(transport);
await client.DiscoverHierarchyAsync(new DiscoverHierarchyOptions
{
RootContainedPath = "Area1/Line3",
MaxDepth = 2,
CategoryIds = [10, 13],
TemplateChainContains = ["Pump"],
TagNameGlob = "Pump_*",
IncludeAttributes = false,
AlarmBearingOnly = true,
HistorizedOnly = true,
});
DiscoverHierarchyRequest request = Assert.Single(transport.DiscoverHierarchyCalls).Request;
Assert.Equal(DiscoverHierarchyRequest.RootOneofCase.RootContainedPath, request.RootCase);
Assert.Equal("Area1/Line3", request.RootContainedPath);
Assert.Equal(2, request.MaxDepth);
Assert.Equal([10, 13], request.CategoryIds);
Assert.Equal(["Pump"], request.TemplateChainContains);
Assert.Equal("Pump_*", request.TagNameGlob);
Assert.True(request.HasIncludeAttributes);
Assert.False(request.IncludeAttributes);
Assert.True(request.AlarmBearingOnly);
Assert.True(request.HistorizedOnly);
}
[Fact]
public async Task TestConnectionAsync_RetriesOnTransientGrpcFailure()
{
@@ -16,7 +16,7 @@ public sealed class MxGatewayClientCliTests
var exitCode = MxGatewayClientCli.Run(["version"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("gateway-protocol=1", output.ToString());
Assert.Contains("gateway-protocol=2", output.ToString());
Assert.Contains("worker-protocol=1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
@@ -30,7 +30,7 @@ public sealed class MxGatewayClientCliTests
int exitCode = await MxGatewayClientCli.RunAsync(["version", "--json"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("\"gatewayProtocolVersion\":1", output.ToString());
Assert.Contains("\"gatewayProtocolVersion\":2", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
@@ -207,8 +207,10 @@ public sealed class MxGatewayClientCliTests
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.GalaxyDiscoverHierarchyReply = new DiscoverHierarchyReply
fakeClient.GalaxyDiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
@@ -227,7 +229,21 @@ public sealed class MxGatewayClientCliTests
},
},
},
};
});
fakeClient.GalaxyDiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
{
GobjectId = 8,
TagName = "DelmiaReceiver_002",
ContainedName = "DelmiaReceiver",
ParentGobjectId = 1,
},
},
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
@@ -242,10 +258,14 @@ public sealed class MxGatewayClientCliTests
_ => fakeClient);
Assert.Equal(0, exitCode);
Assert.Single(fakeClient.GalaxyDiscoverHierarchyRequests);
Assert.Equal(2, fakeClient.GalaxyDiscoverHierarchyRequests.Count);
Assert.Equal(5000, fakeClient.GalaxyDiscoverHierarchyRequests[0].PageSize);
Assert.Equal("", fakeClient.GalaxyDiscoverHierarchyRequests[0].PageToken);
Assert.Equal("7:1", fakeClient.GalaxyDiscoverHierarchyRequests[1].PageToken);
string text = output.ToString();
Assert.Contains("objects=1", text);
Assert.Contains("objects=2", text);
Assert.Contains("DelmiaReceiver_001", text);
Assert.Contains("DelmiaReceiver_002", text);
Assert.Contains("attributes=1", text);
Assert.Equal(string.Empty, error.ToString());
}
@@ -411,6 +431,8 @@ public sealed class MxGatewayClientCliTests
public DiscoverHierarchyReply GalaxyDiscoverHierarchyReply { get; set; } = new();
public Queue<DiscoverHierarchyReply> GalaxyDiscoverHierarchyReplies { get; } = new();
public List<TestConnectionRequest> GalaxyTestConnectionRequests { get; } = [];
public List<GetLastDeployTimeRequest> GalaxyGetLastDeployTimeRequests { get; } = [];
@@ -438,7 +460,10 @@ public sealed class MxGatewayClientCliTests
CancellationToken cancellationToken)
{
GalaxyDiscoverHierarchyRequests.Add(request);
return Task.FromResult(GalaxyDiscoverHierarchyReply);
return Task.FromResult(
GalaxyDiscoverHierarchyReplies.TryDequeue(out DiscoverHierarchyReply? reply)
? reply
: GalaxyDiscoverHierarchyReply);
}
public List<WatchDeployEventsRequest> GalaxyWatchDeployEventsRequests { get; } = [];
@@ -0,0 +1,24 @@
namespace MxGateway.Client;
public sealed record DiscoverHierarchyOptions
{
public int? RootGobjectId { get; init; }
public string? RootTagName { get; init; }
public string? RootContainedPath { get; init; }
public int? MaxDepth { get; init; }
public IReadOnlyList<int> CategoryIds { get; init; } = Array.Empty<int>();
public IReadOnlyList<string> TemplateChainContains { get; init; } = Array.Empty<string>();
public string? TagNameGlob { get; init; }
public bool? IncludeAttributes { get; init; }
public bool AlarmBearingOnly { get; init; }
public bool HistorizedOnly { get; init; }
}
@@ -18,6 +18,8 @@ namespace MxGateway.Client;
/// </summary>
public sealed class GalaxyRepositoryClient : IAsyncDisposable
{
private const int DiscoverHierarchyPageSize = 5000;
private readonly GrpcChannel? _channel;
private readonly IGalaxyRepositoryClientTransport _transport;
private readonly ResiliencePipeline _safeUnaryRetryPipeline;
@@ -68,6 +70,8 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
{
HttpHandler = handler,
LoggerFactory = options.LoggerFactory,
MaxReceiveMessageSize = options.MaxGrpcMessageBytes,
MaxSendMessageSize = options.MaxGrpcMessageBytes,
});
return new GalaxyRepositoryClient(
@@ -141,12 +145,81 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
/// </summary>
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(CancellationToken cancellationToken = default)
{
DiscoverHierarchyReply reply = await DiscoverHierarchyRawAsync(
new DiscoverHierarchyRequest(),
cancellationToken)
.ConfigureAwait(false);
return await DiscoverHierarchyAsync(new DiscoverHierarchyOptions(), cancellationToken).ConfigureAwait(false);
}
return reply.Objects;
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(
DiscoverHierarchyOptions options,
CancellationToken cancellationToken = default)
{
List<GalaxyObject> objects = [];
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
string pageToken = string.Empty;
do
{
DiscoverHierarchyRequest request = CreateDiscoverHierarchyRequest(options);
request.PageSize = DiscoverHierarchyPageSize;
request.PageToken = pageToken;
DiscoverHierarchyReply reply = await DiscoverHierarchyRawAsync(
request,
cancellationToken)
.ConfigureAwait(false);
objects.AddRange(reply.Objects);
pageToken = reply.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken)
&& !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
return objects;
}
private static DiscoverHierarchyRequest CreateDiscoverHierarchyRequest(DiscoverHierarchyOptions options)
{
ArgumentNullException.ThrowIfNull(options);
DiscoverHierarchyRequest request = new()
{
AlarmBearingOnly = options.AlarmBearingOnly,
HistorizedOnly = options.HistorizedOnly,
};
if (options.RootGobjectId.HasValue)
{
request.RootGobjectId = options.RootGobjectId.Value;
}
else if (!string.IsNullOrWhiteSpace(options.RootTagName))
{
request.RootTagName = options.RootTagName;
}
else if (!string.IsNullOrWhiteSpace(options.RootContainedPath))
{
request.RootContainedPath = options.RootContainedPath;
}
if (options.MaxDepth.HasValue)
{
request.MaxDepth = options.MaxDepth.Value;
}
request.CategoryIds.Add(options.CategoryIds);
request.TemplateChainContains.Add(options.TemplateChainContains);
if (!string.IsNullOrWhiteSpace(options.TagNameGlob))
{
request.TagNameGlob = options.TagNameGlob;
}
if (options.IncludeAttributes.HasValue)
{
request.IncludeAttributes = options.IncludeAttributes.Value;
}
return request;
}
public Task<DiscoverHierarchyReply> DiscoverHierarchyRawAsync(
@@ -64,6 +64,8 @@ public sealed class MxGatewayClient : IAsyncDisposable
{
HttpHandler = handler,
LoggerFactory = options.LoggerFactory,
MaxReceiveMessageSize = options.MaxGrpcMessageBytes,
MaxSendMessageSize = options.MaxGrpcMessageBytes,
});
return new MxGatewayClient(
@@ -23,6 +23,8 @@ public sealed class MxGatewayClientOptions
public TimeSpan? StreamTimeout { get; init; }
public int MaxGrpcMessageBytes { get; init; } = 16 * 1024 * 1024;
public MxGatewayClientRetryOptions Retry { get; init; } = new();
public ILoggerFactory? LoggerFactory { get; init; }
@@ -66,6 +68,13 @@ public sealed class MxGatewayClientOptions
"The stream timeout must be greater than zero when configured.");
}
if (MaxGrpcMessageBytes <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(MaxGrpcMessageBytes),
"The maximum gRPC message size must be greater than zero.");
}
if (UseTls && Endpoint.Scheme != Uri.UriSchemeHttps)
{
throw new ArgumentException(
+13
View File
@@ -164,6 +164,19 @@ foreach (GalaxyObject galaxyObject in objects)
}
```
Use `DiscoverHierarchyOptions` to request a server-side slice without pulling
the full Galaxy:
```csharp
IReadOnlyList<GalaxyObject> pumps = await repository.DiscoverHierarchyAsync(
new DiscoverHierarchyOptions
{
RootContainedPath = "Area1/Line3",
TagNameGlob = "Pump_*",
IncludeAttributes = false,
});
```
The CLI exposes the same operations:
```powershell
@@ -10,6 +10,7 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
@@ -191,9 +192,38 @@ func (x *GetLastDeployTimeReply) GetTimeOfLastDeploy() *timestamppb.Timestamp {
}
type DiscoverHierarchyRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
// Maximum number of objects to return. The server applies its default when
// unset and rejects non-positive values.
PageSize int32 `protobuf:"varint,1,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"`
// Opaque token returned by a previous DiscoverHierarchy response.
PageToken string `protobuf:"bytes,2,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"`
// Optional. When set, return only this object and its descendants.
// Empty = full hierarchy.
//
// Types that are valid to be assigned to Root:
//
// *DiscoverHierarchyRequest_RootGobjectId
// *DiscoverHierarchyRequest_RootTagName
// *DiscoverHierarchyRequest_RootContainedPath
Root isDiscoverHierarchyRequest_Root `protobuf_oneof:"root"`
// Optional. Cap on descendant depth from root. Zero returns only the root.
// Unset means unlimited depth.
MaxDepth *wrapperspb.Int32Value `protobuf:"bytes,6,opt,name=max_depth,json=maxDepth,proto3" json:"max_depth,omitempty"`
// Optional object category id filters.
CategoryIds []int32 `protobuf:"varint,7,rep,packed,name=category_ids,json=categoryIds,proto3" json:"category_ids,omitempty"`
// Optional case-insensitive substring filters against template names.
TemplateChainContains []string `protobuf:"bytes,8,rep,name=template_chain_contains,json=templateChainContains,proto3" json:"template_chain_contains,omitempty"`
// Optional anchored, case-insensitive glob over object tag_name.
TagNameGlob string `protobuf:"bytes,9,opt,name=tag_name_glob,json=tagNameGlob,proto3" json:"tag_name_glob,omitempty"`
// Optional. Unset or true includes attributes. False returns object skeletons.
IncludeAttributes *bool `protobuf:"varint,10,opt,name=include_attributes,json=includeAttributes,proto3,oneof" json:"include_attributes,omitempty"`
// Optional. Return only objects with at least one alarm-bearing attribute.
AlarmBearingOnly bool `protobuf:"varint,11,opt,name=alarm_bearing_only,json=alarmBearingOnly,proto3" json:"alarm_bearing_only,omitempty"`
// Optional. Return only objects with at least one historized attribute.
HistorizedOnly bool `protobuf:"varint,12,opt,name=historized_only,json=historizedOnly,proto3" json:"historized_only,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DiscoverHierarchyRequest) Reset() {
@@ -226,11 +256,134 @@ func (*DiscoverHierarchyRequest) Descriptor() ([]byte, []int) {
return file_galaxy_repository_proto_rawDescGZIP(), []int{4}
}
func (x *DiscoverHierarchyRequest) GetPageSize() int32 {
if x != nil {
return x.PageSize
}
return 0
}
func (x *DiscoverHierarchyRequest) GetPageToken() string {
if x != nil {
return x.PageToken
}
return ""
}
func (x *DiscoverHierarchyRequest) GetRoot() isDiscoverHierarchyRequest_Root {
if x != nil {
return x.Root
}
return nil
}
func (x *DiscoverHierarchyRequest) GetRootGobjectId() int32 {
if x != nil {
if x, ok := x.Root.(*DiscoverHierarchyRequest_RootGobjectId); ok {
return x.RootGobjectId
}
}
return 0
}
func (x *DiscoverHierarchyRequest) GetRootTagName() string {
if x != nil {
if x, ok := x.Root.(*DiscoverHierarchyRequest_RootTagName); ok {
return x.RootTagName
}
}
return ""
}
func (x *DiscoverHierarchyRequest) GetRootContainedPath() string {
if x != nil {
if x, ok := x.Root.(*DiscoverHierarchyRequest_RootContainedPath); ok {
return x.RootContainedPath
}
}
return ""
}
func (x *DiscoverHierarchyRequest) GetMaxDepth() *wrapperspb.Int32Value {
if x != nil {
return x.MaxDepth
}
return nil
}
func (x *DiscoverHierarchyRequest) GetCategoryIds() []int32 {
if x != nil {
return x.CategoryIds
}
return nil
}
func (x *DiscoverHierarchyRequest) GetTemplateChainContains() []string {
if x != nil {
return x.TemplateChainContains
}
return nil
}
func (x *DiscoverHierarchyRequest) GetTagNameGlob() string {
if x != nil {
return x.TagNameGlob
}
return ""
}
func (x *DiscoverHierarchyRequest) GetIncludeAttributes() bool {
if x != nil && x.IncludeAttributes != nil {
return *x.IncludeAttributes
}
return false
}
func (x *DiscoverHierarchyRequest) GetAlarmBearingOnly() bool {
if x != nil {
return x.AlarmBearingOnly
}
return false
}
func (x *DiscoverHierarchyRequest) GetHistorizedOnly() bool {
if x != nil {
return x.HistorizedOnly
}
return false
}
type isDiscoverHierarchyRequest_Root interface {
isDiscoverHierarchyRequest_Root()
}
type DiscoverHierarchyRequest_RootGobjectId struct {
RootGobjectId int32 `protobuf:"varint,3,opt,name=root_gobject_id,json=rootGobjectId,proto3,oneof"`
}
type DiscoverHierarchyRequest_RootTagName struct {
RootTagName string `protobuf:"bytes,4,opt,name=root_tag_name,json=rootTagName,proto3,oneof"`
}
type DiscoverHierarchyRequest_RootContainedPath struct {
RootContainedPath string `protobuf:"bytes,5,opt,name=root_contained_path,json=rootContainedPath,proto3,oneof"`
}
func (*DiscoverHierarchyRequest_RootGobjectId) isDiscoverHierarchyRequest_Root() {}
func (*DiscoverHierarchyRequest_RootTagName) isDiscoverHierarchyRequest_Root() {}
func (*DiscoverHierarchyRequest_RootContainedPath) isDiscoverHierarchyRequest_Root() {}
type DiscoverHierarchyReply struct {
state protoimpl.MessageState `protogen:"open.v1"`
Objects []*GalaxyObject `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Objects []*GalaxyObject `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
// Non-empty when another page is available.
NextPageToken string `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"`
// Total number of objects in the cached hierarchy at the time of the call.
TotalObjectCount int32 `protobuf:"varint,3,opt,name=total_object_count,json=totalObjectCount,proto3" json:"total_object_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DiscoverHierarchyReply) Reset() {
@@ -270,6 +423,20 @@ func (x *DiscoverHierarchyReply) GetObjects() []*GalaxyObject {
return nil
}
func (x *DiscoverHierarchyReply) GetNextPageToken() string {
if x != nil {
return x.NextPageToken
}
return ""
}
func (x *DiscoverHierarchyReply) GetTotalObjectCount() int32 {
if x != nil {
return x.TotalObjectCount
}
return 0
}
type WatchDeployEventsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Optional. When set, the bootstrap event is suppressed if the cached deploy
@@ -647,17 +814,35 @@ var File_galaxy_repository_proto protoreflect.FileDescriptor
const file_galaxy_repository_proto_rawDesc = "" +
"\n" +
"\x17galaxy_repository.proto\x12\x14galaxy_repository.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\x17\n" +
"\x17galaxy_repository.proto\x12\x14galaxy_repository.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\x17\n" +
"\x15TestConnectionRequest\"%\n" +
"\x13TestConnectionReply\x12\x0e\n" +
"\x02ok\x18\x01 \x01(\bR\x02ok\"\x1a\n" +
"\x18GetLastDeployTimeRequest\"}\n" +
"\x16GetLastDeployTimeReply\x12\x18\n" +
"\apresent\x18\x01 \x01(\bR\apresent\x12I\n" +
"\x13time_of_last_deploy\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\x10timeOfLastDeploy\"\x1a\n" +
"\x18DiscoverHierarchyRequest\"V\n" +
"\x13time_of_last_deploy\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\x10timeOfLastDeploy\"\xbb\x04\n" +
"\x18DiscoverHierarchyRequest\x12\x1b\n" +
"\tpage_size\x18\x01 \x01(\x05R\bpageSize\x12\x1d\n" +
"\n" +
"page_token\x18\x02 \x01(\tR\tpageToken\x12(\n" +
"\x0froot_gobject_id\x18\x03 \x01(\x05H\x00R\rrootGobjectId\x12$\n" +
"\rroot_tag_name\x18\x04 \x01(\tH\x00R\vrootTagName\x120\n" +
"\x13root_contained_path\x18\x05 \x01(\tH\x00R\x11rootContainedPath\x128\n" +
"\tmax_depth\x18\x06 \x01(\v2\x1b.google.protobuf.Int32ValueR\bmaxDepth\x12!\n" +
"\fcategory_ids\x18\a \x03(\x05R\vcategoryIds\x126\n" +
"\x17template_chain_contains\x18\b \x03(\tR\x15templateChainContains\x12\"\n" +
"\rtag_name_glob\x18\t \x01(\tR\vtagNameGlob\x122\n" +
"\x12include_attributes\x18\n" +
" \x01(\bH\x01R\x11includeAttributes\x88\x01\x01\x12,\n" +
"\x12alarm_bearing_only\x18\v \x01(\bR\x10alarmBearingOnly\x12'\n" +
"\x0fhistorized_only\x18\f \x01(\bR\x0ehistorizedOnlyB\x06\n" +
"\x04rootB\x15\n" +
"\x13_include_attributes\"\xac\x01\n" +
"\x16DiscoverHierarchyReply\x12<\n" +
"\aobjects\x18\x01 \x03(\v2\".galaxy_repository.v1.GalaxyObjectR\aobjects\"i\n" +
"\aobjects\x18\x01 \x03(\v2\".galaxy_repository.v1.GalaxyObjectR\aobjects\x12&\n" +
"\x0fnext_page_token\x18\x02 \x01(\tR\rnextPageToken\x12,\n" +
"\x12total_object_count\x18\x03 \x01(\x05R\x10totalObjectCount\"i\n" +
"\x18WatchDeployEventsRequest\x12M\n" +
"\x15last_seen_deploy_time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\x12lastSeenDeployTime\"\xbb\x02\n" +
"\vDeployEvent\x12\x1a\n" +
@@ -730,27 +915,29 @@ var file_galaxy_repository_proto_goTypes = []any{
(*GalaxyObject)(nil), // 8: galaxy_repository.v1.GalaxyObject
(*GalaxyAttribute)(nil), // 9: galaxy_repository.v1.GalaxyAttribute
(*timestamppb.Timestamp)(nil), // 10: google.protobuf.Timestamp
(*wrapperspb.Int32Value)(nil), // 11: google.protobuf.Int32Value
}
var file_galaxy_repository_proto_depIdxs = []int32{
10, // 0: galaxy_repository.v1.GetLastDeployTimeReply.time_of_last_deploy:type_name -> google.protobuf.Timestamp
8, // 1: galaxy_repository.v1.DiscoverHierarchyReply.objects:type_name -> galaxy_repository.v1.GalaxyObject
10, // 2: galaxy_repository.v1.WatchDeployEventsRequest.last_seen_deploy_time:type_name -> google.protobuf.Timestamp
10, // 3: galaxy_repository.v1.DeployEvent.observed_at:type_name -> google.protobuf.Timestamp
10, // 4: galaxy_repository.v1.DeployEvent.time_of_last_deploy:type_name -> google.protobuf.Timestamp
9, // 5: galaxy_repository.v1.GalaxyObject.attributes:type_name -> galaxy_repository.v1.GalaxyAttribute
0, // 6: galaxy_repository.v1.GalaxyRepository.TestConnection:input_type -> galaxy_repository.v1.TestConnectionRequest
2, // 7: galaxy_repository.v1.GalaxyRepository.GetLastDeployTime:input_type -> galaxy_repository.v1.GetLastDeployTimeRequest
4, // 8: galaxy_repository.v1.GalaxyRepository.DiscoverHierarchy:input_type -> galaxy_repository.v1.DiscoverHierarchyRequest
6, // 9: galaxy_repository.v1.GalaxyRepository.WatchDeployEvents:input_type -> galaxy_repository.v1.WatchDeployEventsRequest
1, // 10: galaxy_repository.v1.GalaxyRepository.TestConnection:output_type -> galaxy_repository.v1.TestConnectionReply
3, // 11: galaxy_repository.v1.GalaxyRepository.GetLastDeployTime:output_type -> galaxy_repository.v1.GetLastDeployTimeReply
5, // 12: galaxy_repository.v1.GalaxyRepository.DiscoverHierarchy:output_type -> galaxy_repository.v1.DiscoverHierarchyReply
7, // 13: galaxy_repository.v1.GalaxyRepository.WatchDeployEvents:output_type -> galaxy_repository.v1.DeployEvent
10, // [10:14] is the sub-list for method output_type
6, // [6:10] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
11, // 1: galaxy_repository.v1.DiscoverHierarchyRequest.max_depth:type_name -> google.protobuf.Int32Value
8, // 2: galaxy_repository.v1.DiscoverHierarchyReply.objects:type_name -> galaxy_repository.v1.GalaxyObject
10, // 3: galaxy_repository.v1.WatchDeployEventsRequest.last_seen_deploy_time:type_name -> google.protobuf.Timestamp
10, // 4: galaxy_repository.v1.DeployEvent.observed_at:type_name -> google.protobuf.Timestamp
10, // 5: galaxy_repository.v1.DeployEvent.time_of_last_deploy:type_name -> google.protobuf.Timestamp
9, // 6: galaxy_repository.v1.GalaxyObject.attributes:type_name -> galaxy_repository.v1.GalaxyAttribute
0, // 7: galaxy_repository.v1.GalaxyRepository.TestConnection:input_type -> galaxy_repository.v1.TestConnectionRequest
2, // 8: galaxy_repository.v1.GalaxyRepository.GetLastDeployTime:input_type -> galaxy_repository.v1.GetLastDeployTimeRequest
4, // 9: galaxy_repository.v1.GalaxyRepository.DiscoverHierarchy:input_type -> galaxy_repository.v1.DiscoverHierarchyRequest
6, // 10: galaxy_repository.v1.GalaxyRepository.WatchDeployEvents:input_type -> galaxy_repository.v1.WatchDeployEventsRequest
1, // 11: galaxy_repository.v1.GalaxyRepository.TestConnection:output_type -> galaxy_repository.v1.TestConnectionReply
3, // 12: galaxy_repository.v1.GalaxyRepository.GetLastDeployTime:output_type -> galaxy_repository.v1.GetLastDeployTimeReply
5, // 13: galaxy_repository.v1.GalaxyRepository.DiscoverHierarchy:output_type -> galaxy_repository.v1.DiscoverHierarchyReply
7, // 14: galaxy_repository.v1.GalaxyRepository.WatchDeployEvents:output_type -> galaxy_repository.v1.DeployEvent
11, // [11:15] is the sub-list for method output_type
7, // [7:11] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_galaxy_repository_proto_init() }
@@ -758,6 +945,11 @@ func file_galaxy_repository_proto_init() {
if File_galaxy_repository_proto != nil {
return
}
file_galaxy_repository_proto_msgTypes[4].OneofWrappers = []any{
(*DiscoverHierarchyRequest_RootGobjectId)(nil),
(*DiscoverHierarchyRequest_RootTagName)(nil),
(*DiscoverHierarchyRequest_RootContainedPath)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
+12
View File
@@ -16,6 +16,7 @@ import (
const (
defaultDialTimeout = 10 * time.Second
defaultCallTimeout = 30 * time.Second
defaultMaxGrpcMessageBytes = 16 * 1024 * 1024
)
// Client owns a gateway gRPC connection and exposes session-oriented helpers.
@@ -50,6 +51,10 @@ func Dial(ctx context.Context, opts Options) (*Client, error) {
grpc.WithTransportCredentials(transportCredentials),
grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)),
grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(resolveMaxGrpcMessageBytes(opts)),
grpc.MaxCallSendMsgSize(resolveMaxGrpcMessageBytes(opts)),
),
grpc.WithBlock(),
}
dialOptions = append(dialOptions, opts.DialOptions...)
@@ -62,6 +67,13 @@ func Dial(ctx context.Context, opts Options) (*Client, error) {
return NewClient(conn, opts), nil
}
func resolveMaxGrpcMessageBytes(opts Options) int {
if opts.MaxGrpcMessageBytes > 0 {
return opts.MaxGrpcMessageBytes
}
return defaultMaxGrpcMessageBytes
}
// NewClient wraps an existing gRPC connection. The caller owns closing conn
// unless it calls Close on the returned Client.
func NewClient(conn *grpc.ClientConn, opts Options) *Client {
+28 -4
View File
@@ -3,6 +3,7 @@ package mxgateway
import (
"context"
"errors"
"fmt"
"io"
"time"
@@ -13,6 +14,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
const discoverHierarchyPageSize = 5000
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
// Galaxy Repository service exposed for callers that need direct contract
// access.
@@ -70,6 +73,10 @@ func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) {
grpc.WithTransportCredentials(transportCredentials),
grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)),
grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(resolveMaxGrpcMessageBytes(opts)),
grpc.MaxCallSendMsgSize(resolveMaxGrpcMessageBytes(opts)),
),
grpc.WithBlock(),
}
dialOptions = append(dialOptions, opts.DialOptions...)
@@ -141,11 +148,28 @@ func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject,
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{})
if err != nil {
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
var objects []*GalaxyObject
seenPageTokens := make(map[string]struct{})
pageToken := ""
for {
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{
PageSize: discoverHierarchyPageSize,
PageToken: pageToken,
})
if err != nil {
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
}
objects = append(objects, reply.GetObjects()...)
pageToken = reply.GetNextPageToken()
if pageToken == "" {
break
}
if _, seen := seenPageTokens[pageToken]; seen {
return nil, fmt.Errorf("mxgateway: galaxy discover hierarchy returned repeated page token %q", pageToken)
}
seenPageTokens[pageToken] = struct{}{}
}
return reply.GetObjects(), nil
return objects, nil
}
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
+45 -2
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"net"
"strings"
"testing"
"time"
@@ -95,7 +96,9 @@ func TestGalaxyGetLastDeployTimeReturnsAbsentWhenTimestampNil(t *testing.T) {
func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
fake := &fakeGalaxyServer{
discoverReply: &pb.DiscoverHierarchyReply{
discoverReplies: []*pb.DiscoverHierarchyReply{{
NextPageToken: "page-2",
TotalObjectCount: 2,
Objects: []*pb.GalaxyObject{
{
GobjectId: 1,
@@ -114,6 +117,10 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
},
},
},
},
}, {
TotalObjectCount: 2,
Objects: []*pb.GalaxyObject{
{
GobjectId: 2,
TagName: "TestMachine_002",
@@ -121,7 +128,7 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
ParentGobjectId: 1,
},
},
},
}},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
@@ -133,6 +140,15 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
if len(objects) != 2 {
t.Fatalf("len(objects) = %d, want 2", len(objects))
}
if len(fake.discoverRequests) != 2 {
t.Fatalf("len(discoverRequests) = %d, want 2", len(fake.discoverRequests))
}
if fake.discoverRequests[0].GetPageSize() != 5000 || fake.discoverRequests[0].GetPageToken() != "" {
t.Fatalf("first request = %+v", fake.discoverRequests[0])
}
if fake.discoverRequests[1].GetPageToken() != "page-2" {
t.Fatalf("second page_token = %q, want page-2", fake.discoverRequests[1].GetPageToken())
}
if objects[0].GetTagName() != "TestMachine_001" {
t.Fatalf("objects[0].TagName = %q", objects[0].GetTagName())
}
@@ -144,6 +160,25 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
}
}
func TestGalaxyDiscoverHierarchyRejectsRepeatedPageToken(t *testing.T) {
fake := &fakeGalaxyServer{
discoverReplies: []*pb.DiscoverHierarchyReply{
{NextPageToken: "7:1"},
{NextPageToken: "7:1"},
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
_, err := client.DiscoverHierarchy(context.Background())
if err == nil {
t.Fatal("DiscoverHierarchy() error = nil, want repeated token error")
}
if !strings.Contains(err.Error(), "repeated page token") {
t.Fatalf("error = %v, want repeated page token", err)
}
}
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
fake := &fakeGalaxyServer{failTest: true}
client, cleanup := newGalaxyBufconnClient(t, fake)
@@ -375,6 +410,8 @@ type fakeGalaxyServer struct {
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
discoverReplies []*pb.DiscoverHierarchyReply
discoverRequests []*pb.DiscoverHierarchyRequest
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
@@ -400,6 +437,12 @@ func (s *fakeGalaxyServer) GetLastDeployTime(ctx context.Context, req *pb.GetLas
}
func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) {
s.discoverRequests = append(s.discoverRequests, req)
if len(s.discoverReplies) > 0 {
reply := s.discoverReplies[0]
s.discoverReplies = s.discoverReplies[1:]
return reply, nil
}
if s.discoverReply != nil {
return s.discoverReply, nil
}
+1
View File
@@ -18,6 +18,7 @@ type Options struct {
ServerNameOverride string
DialTimeout time.Duration
CallTimeout time.Duration
MaxGrpcMessageBytes int
TLSConfig *tls.Config
TransportCredentials credentials.TransportCredentials
DialOptions []grpc.DialOption
+1 -1
View File
@@ -7,7 +7,7 @@ const (
// GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion
// in the shared .NET contracts.
GatewayProtocolVersion uint32 = 1
GatewayProtocolVersion uint32 = 2
// WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion
// and is exposed for fake-worker and parity tests.
@@ -32,7 +32,7 @@ final class MxGatewayCliTests {
assertEquals(0, run.exitCode());
assertEquals("", run.errors());
assertTrue(run.output().contains("mxgateway-java 0.1.0"));
assertTrue(run.output().contains("gatewayProtocolVersion=1"));
assertTrue(run.output().contains("gatewayProtocolVersion=2"));
assertTrue(run.output().contains("workerProtocolVersion=1"));
}
@@ -42,7 +42,7 @@ final class MxGatewayCliTests {
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"clientVersion\":\"0.1.0\""));
assertTrue(run.output().contains("\"gatewayProtocolVersion\":1"));
assertTrue(run.output().contains("\"gatewayProtocolVersion\":2"));
}
@Test
@@ -11,6 +11,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Iterator-style adaptor over the {@code WatchDeployEvents} server-streaming
@@ -22,8 +23,8 @@ public final class DeployEventStream implements Iterator<DeployEvent>, AutoClose
private static final Object END = new Object();
private final BlockingQueue<Object> queue;
private final AtomicBoolean closed = new AtomicBoolean();
private volatile ClientCallStreamObserver<WatchDeployEventsRequest> requestStream;
private volatile boolean closed;
private Object next;
DeployEventStream(int capacity) {
@@ -35,6 +36,9 @@ public final class DeployEventStream implements Iterator<DeployEvent>, AutoClose
@Override
public void beforeStart(ClientCallStreamObserver<WatchDeployEventsRequest> requestStream) {
DeployEventStream.this.requestStream = requestStream;
if (closed.get()) {
requestStream.cancel("client cancelled deploy event stream", null);
}
}
@Override
@@ -44,7 +48,7 @@ public final class DeployEventStream implements Iterator<DeployEvent>, AutoClose
@Override
public void onError(Throwable error) {
if (Status.fromThrowable(error).getCode() == Status.Code.CANCELLED && closed) {
if (Status.fromThrowable(error).getCode() == Status.Code.CANCELLED && closed.get()) {
offer(END);
return;
}
@@ -90,7 +94,7 @@ public final class DeployEventStream implements Iterator<DeployEvent>, AutoClose
@Override
public void close() {
closed = true;
closed.set(true);
ClientCallStreamObserver<WatchDeployEventsRequest> stream = requestStream;
if (stream != null) {
stream.cancel("client cancelled deploy event stream", null);
@@ -36,6 +36,8 @@ import javax.net.ssl.SSLException;
* {@link MxGatewayClient}.
*/
public final class GalaxyRepositoryClient implements AutoCloseable {
private static final int DISCOVER_HIERARCHY_PAGE_SIZE = 5000;
private final ManagedChannel ownedChannel;
private final MxGatewayClientOptions options;
private final GalaxyRepositoryGrpc.GalaxyRepositoryBlockingStub blockingStub;
@@ -130,9 +132,22 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
*/
public List<GalaxyObject> discoverHierarchy() {
try {
DiscoverHierarchyReply reply =
rawBlockingStub().discoverHierarchy(DiscoverHierarchyRequest.getDefaultInstance());
return reply.getObjectsList();
java.util.ArrayList<GalaxyObject> objects = new java.util.ArrayList<>();
java.util.HashSet<String> seenPageTokens = new java.util.HashSet<>();
String pageToken = "";
do {
DiscoverHierarchyReply reply = rawBlockingStub().discoverHierarchy(DiscoverHierarchyRequest.newBuilder()
.setPageSize(DISCOVER_HIERARCHY_PAGE_SIZE)
.setPageToken(pageToken)
.build());
objects.addAll(reply.getObjectsList());
pageToken = reply.getNextPageToken();
if (!pageToken.isBlank() && !seenPageTokens.add(pageToken)) {
throw new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: " + pageToken);
}
} while (!pageToken.isBlank());
return objects;
} catch (RuntimeException error) {
if (error instanceof MxGatewayException) {
throw error;
@@ -142,8 +157,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
}
public CompletableFuture<List<GalaxyObject>> discoverHierarchyAsync() {
return toCompletable(rawFutureStub().discoverHierarchy(DiscoverHierarchyRequest.getDefaultInstance()))
.thenApply(DiscoverHierarchyReply::getObjectsList);
return discoverHierarchyPageAsync("", new java.util.ArrayList<>(), new java.util.HashSet<>());
}
/**
@@ -226,7 +240,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
private static ManagedChannel createChannel(MxGatewayClientOptions options) {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint())
.maxInboundMessageSize(16 * 1024 * 1024);
.maxInboundMessageSize(options.maxGrpcMessageBytes());
if (!options.connectTimeout().isNegative()) {
builder.withOption(
io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS,
@@ -258,6 +272,27 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
private CompletableFuture<List<GalaxyObject>> discoverHierarchyPageAsync(
String pageToken, java.util.ArrayList<GalaxyObject> objects, java.util.HashSet<String> seenPageTokens) {
DiscoverHierarchyRequest request = DiscoverHierarchyRequest.newBuilder()
.setPageSize(DISCOVER_HIERARCHY_PAGE_SIZE)
.setPageToken(pageToken)
.build();
return toCompletable(rawFutureStub().discoverHierarchy(request)).thenCompose(reply -> {
objects.addAll(reply.getObjectsList());
if (reply.getNextPageToken().isBlank()) {
return CompletableFuture.completedFuture(objects);
}
if (!seenPageTokens.add(reply.getNextPageToken())) {
CompletableFuture<List<GalaxyObject>> failed = new CompletableFuture<>();
failed.completeExceptionally(new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: " + reply.getNextPageToken()));
return failed;
}
return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens);
});
}
private static <T> CompletableFuture<T> toCompletable(com.google.common.util.concurrent.ListenableFuture<T> source) {
CompletableFuture<T> target = new CompletableFuture<>();
Futures.addCallback(
@@ -169,7 +169,7 @@ public final class MxGatewayClient implements AutoCloseable {
private static ManagedChannel createChannel(MxGatewayClientOptions options) {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint())
.maxInboundMessageSize(16 * 1024 * 1024);
.maxInboundMessageSize(options.maxGrpcMessageBytes());
if (!options.connectTimeout().isNegative()) {
builder.withOption(
io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS,
@@ -7,6 +7,7 @@ import java.util.Objects;
public final class MxGatewayClientOptions {
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofSeconds(30);
private static final int DEFAULT_MAX_GRPC_MESSAGE_BYTES = 16 * 1024 * 1024;
private final String endpoint;
private final String apiKey;
@@ -16,6 +17,7 @@ public final class MxGatewayClientOptions {
private final Duration connectTimeout;
private final Duration callTimeout;
private final Duration streamTimeout;
private final int maxGrpcMessageBytes;
private MxGatewayClientOptions(Builder builder) {
endpoint = requireText(builder.endpoint, "endpoint");
@@ -26,6 +28,9 @@ public final class MxGatewayClientOptions {
connectTimeout = builder.connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : builder.connectTimeout;
callTimeout = builder.callTimeout == null ? DEFAULT_CALL_TIMEOUT : builder.callTimeout;
streamTimeout = builder.streamTimeout;
maxGrpcMessageBytes = builder.maxGrpcMessageBytes <= 0
? DEFAULT_MAX_GRPC_MESSAGE_BYTES
: builder.maxGrpcMessageBytes;
}
public static Builder builder() {
@@ -68,6 +73,10 @@ public final class MxGatewayClientOptions {
return streamTimeout;
}
public int maxGrpcMessageBytes() {
return maxGrpcMessageBytes;
}
@Override
public String toString() {
return "MxGatewayClientOptions{"
@@ -90,6 +99,8 @@ public final class MxGatewayClientOptions {
+ callTimeout
+ ", streamTimeout="
+ streamTimeout
+ ", maxGrpcMessageBytes="
+ maxGrpcMessageBytes
+ '}';
}
@@ -109,6 +120,7 @@ public final class MxGatewayClientOptions {
private Duration connectTimeout;
private Duration callTimeout;
private Duration streamTimeout;
private int maxGrpcMessageBytes;
private Builder() {
}
@@ -153,6 +165,11 @@ public final class MxGatewayClientOptions {
return this;
}
public Builder maxGrpcMessageBytes(int value) {
maxGrpcMessageBytes = value;
return this;
}
public MxGatewayClientOptions build() {
return new MxGatewayClientOptions(this);
}
@@ -1,7 +1,7 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewayClientVersion {
private static final int GATEWAY_PROTOCOL_VERSION = 1;
private static final int GATEWAY_PROTOCOL_VERSION = 2;
private static final int WORKER_PROTOCOL_VERSION = 1;
private static final String CLIENT_VERSION = "0.1.0";
@@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.protobuf.Timestamp;
@@ -25,6 +26,8 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.time.Instant;
@@ -100,31 +103,44 @@ final class GalaxyRepositoryClientTests {
@Test
void discoverHierarchyReturnsObjectsAndAttributes() throws Exception {
AtomicReference<DiscoverHierarchyRequest> seenRequest = new AtomicReference<>();
AtomicReference<DiscoverHierarchyRequest> firstRequest = new AtomicReference<>();
AtomicReference<DiscoverHierarchyRequest> secondRequest = new AtomicReference<>();
TestService service = new TestService() {
@Override
public void discoverHierarchy(
DiscoverHierarchyRequest request, StreamObserver<DiscoverHierarchyReply> responseObserver) {
seenRequest.set(request);
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.addObjects(GalaxyObject.newBuilder()
.setGobjectId(7)
.setTagName("Pump_001")
.setContainedName("Pump")
.setBrowseName("Pump")
.setParentGobjectId(1)
.setIsArea(false)
.setCategoryId(3)
.setHostedByGobjectId(0)
.addTemplateChain("$Pump")
.addAttributes(GalaxyAttribute.newBuilder()
.setAttributeName("Speed")
.setFullTagReference("Pump_001.Speed")
.setMxDataType(5)
.setDataTypeName("MxFloat")
.setIsArray(false)
.setIsHistorized(true)))
.build());
if (request.getPageToken().isEmpty()) {
firstRequest.set(request);
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.setNextPageToken("page-2")
.setTotalObjectCount(2)
.addObjects(GalaxyObject.newBuilder()
.setGobjectId(7)
.setTagName("Pump_001")
.setContainedName("Pump")
.setBrowseName("Pump")
.setParentGobjectId(1)
.setIsArea(false)
.setCategoryId(3)
.setHostedByGobjectId(0)
.addTemplateChain("$Pump")
.addAttributes(GalaxyAttribute.newBuilder()
.setAttributeName("Speed")
.setFullTagReference("Pump_001.Speed")
.setMxDataType(5)
.setDataTypeName("MxFloat")
.setIsArray(false)
.setIsHistorized(true)))
.build());
} else {
secondRequest.set(request);
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.setTotalObjectCount(2)
.addObjects(GalaxyObject.newBuilder()
.setGobjectId(8)
.setTagName("Pump_002"))
.build());
}
responseObserver.onCompleted();
}
};
@@ -132,7 +148,10 @@ final class GalaxyRepositoryClientTests {
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<GalaxyObject> objects = client.discoverHierarchy();
assertEquals(1, objects.size());
assertEquals(2, objects.size());
assertEquals(5000, firstRequest.get().getPageSize());
assertEquals("", firstRequest.get().getPageToken());
assertEquals("page-2", secondRequest.get().getPageToken());
GalaxyObject only = objects.get(0);
assertEquals(7, only.getGobjectId());
assertEquals("Pump_001", only.getTagName());
@@ -142,6 +161,41 @@ final class GalaxyRepositoryClientTests {
}
}
@Test
void deployEventStreamCloseBeforeBeforeStartCancelsStream() {
DeployEventStream stream = new DeployEventStream(4);
ClientResponseObserver<WatchDeployEventsRequest, DeployEvent> observer = stream.observer();
RecordingClientCallStreamObserver requestStream = new RecordingClientCallStreamObserver();
stream.close();
observer.beforeStart(requestStream);
assertTrue(requestStream.cancelled);
assertEquals("client cancelled deploy event stream", requestStream.cancelMessage);
assertFalse(stream.hasNext());
}
@Test
void discoverHierarchyRejectsRepeatedPageToken() throws Exception {
TestService service = new TestService() {
@Override
public void discoverHierarchy(
DiscoverHierarchyRequest request, StreamObserver<DiscoverHierarchyReply> responseObserver) {
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.setNextPageToken("7:1")
.build());
responseObserver.onCompleted();
}
};
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
MxGatewayException error = assertThrows(MxGatewayException.class, client::discoverHierarchy);
assertTrue(error.getMessage().contains("repeated page token"));
}
}
@Test
void watchDeployEventsReceivesEventsInOrder() throws Exception {
DeployEvent first = DeployEvent.newBuilder()
@@ -281,6 +335,51 @@ final class GalaxyRepositoryClientTests {
}
}
private static final class RecordingClientCallStreamObserver
extends ClientCallStreamObserver<WatchDeployEventsRequest> {
private boolean cancelled;
private String cancelMessage;
@Override
public boolean isReady() {
return true;
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
}
@Override
public void disableAutoInboundFlowControl() {
}
@Override
public void request(int count) {
}
@Override
public void setMessageCompression(boolean enable) {
}
@Override
public void cancel(String message, Throwable cause) {
cancelled = true;
cancelMessage = message;
}
@Override
public void onNext(WatchDeployEventsRequest value) {
}
@Override
public void onError(Throwable error) {
}
@Override
public void onCompleted() {
}
}
private record InProcessGalaxy(Server server, ManagedChannel channel) implements AutoCloseable {
static InProcessGalaxy start(
GalaxyRepositoryGrpc.GalaxyRepositoryImplBase service, AtomicReference<String> authorization)
@@ -2,7 +2,7 @@
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-client-behavior",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"gatewayProtocolVersion": 2,
"workerProtocolVersion": 1,
"protoInputManifest": "clients/proto/proto-inputs.json",
"fixtures": [
@@ -3,7 +3,7 @@
"backendName": "mxaccess-worker",
"workerProcessId": 1234,
"workerProtocolVersion": 1,
"gatewayProtocolVersion": 1,
"gatewayProtocolVersion": 2,
"capabilities": [
"unary-open-session",
"unary-close-session",
@@ -2,7 +2,7 @@
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-parity-fixture-matrix",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"gatewayProtocolVersion": 2,
"workerProtocolVersion": 1,
"sourceCaptureRoot": "C:/Users/dohertj2/Desktop/mxaccess/captures",
"sourceDocs": [
+1 -1
View File
@@ -1,7 +1,7 @@
{
"schemaVersion": 1,
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"gatewayProtocolVersion": 2,
"workerProtocolVersion": 1,
"protoRoot": "src/MxGateway.Contracts/Protos",
"sourceFiles": [
+24 -7
View File
@@ -18,11 +18,13 @@ import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from .auth import merge_metadata
from .errors import map_rpc_error
from .errors import MxGatewayError, map_rpc_error
from .generated import galaxy_repository_pb2 as galaxy_pb
from .generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
from .options import ClientOptions, create_channel
_DISCOVER_HIERARCHY_PAGE_SIZE = 5000
class GalaxyRepositoryClient:
"""Async client for the Galaxy Repository gRPC service."""
@@ -112,12 +114,27 @@ class GalaxyRepositoryClient:
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
reply = await self._unary(
"discover hierarchy",
self.raw_stub.DiscoverHierarchy,
galaxy_pb.DiscoverHierarchyRequest(),
)
return list(reply.objects)
objects: list[galaxy_pb.GalaxyObject] = []
seen_page_tokens: set[str] = set()
page_token = ""
while True:
reply = await self._unary(
"discover hierarchy",
self.raw_stub.DiscoverHierarchy,
galaxy_pb.DiscoverHierarchyRequest(
page_size=_DISCOVER_HIERARCHY_PAGE_SIZE,
page_token=page_token,
),
)
objects.extend(reply.objects)
page_token = reply.next_page_token
if not page_token:
return objects
if page_token in seen_page_tokens:
raise MxGatewayError(
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
)
seen_page_tokens.add(page_token)
def watch_deploy_events(
self,
@@ -23,9 +23,10 @@ _sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17galaxy_repository.proto\x12\x14galaxy_repository.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\x17\n\x15TestConnectionRequest\"!\n\x13TestConnectionReply\x12\n\n\x02ok\x18\x01 \x01(\x08\"\x1a\n\x18GetLastDeployTimeRequest\"b\n\x16GetLastDeployTimeReply\x12\x0f\n\x07present\x18\x01 \x01(\x08\x12\x37\n\x13time_of_last_deploy\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1a\n\x18\x44iscoverHierarchyRequest\"M\n\x16\x44iscoverHierarchyReply\x12\x33\n\x07objects\x18\x01 \x03(\x0b\x32\".galaxy_repository.v1.GalaxyObject\"U\n\x18WatchDeployEventsRequest\x12\x39\n\x15last_seen_deploy_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xdd\x01\n\x0b\x44\x65ployEvent\x12\x10\n\x08sequence\x18\x01 \x01(\x04\x12/\n\x0bobserved_at\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x37\n\x13time_of_last_deploy\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12#\n\x1btime_of_last_deploy_present\x18\x04 \x01(\x08\x12\x14\n\x0cobject_count\x18\x05 \x01(\x05\x12\x17\n\x0f\x61ttribute_count\x18\x06 \x01(\x05\"\x93\x02\n\x0cGalaxyObject\x12\x12\n\ngobject_id\x18\x01 \x01(\x05\x12\x10\n\x08tag_name\x18\x02 \x01(\t\x12\x16\n\x0e\x63ontained_name\x18\x03 \x01(\t\x12\x13\n\x0b\x62rowse_name\x18\x04 \x01(\t\x12\x19\n\x11parent_gobject_id\x18\x05 \x01(\x05\x12\x0f\n\x07is_area\x18\x06 \x01(\x08\x12\x13\n\x0b\x63\x61tegory_id\x18\x07 \x01(\x05\x12\x1c\n\x14hosted_by_gobject_id\x18\x08 \x01(\x05\x12\x16\n\x0etemplate_chain\x18\t \x03(\t\x12\x39\n\nattributes\x18\n \x03(\x0b\x32%.galaxy_repository.v1.GalaxyAttribute\"\xa8\x02\n\x0fGalaxyAttribute\x12\x16\n\x0e\x61ttribute_name\x18\x01 \x01(\t\x12\x1a\n\x12\x66ull_tag_reference\x18\x02 \x01(\t\x12\x14\n\x0cmx_data_type\x18\x03 \x01(\x05\x12\x16\n\x0e\x64\x61ta_type_name\x18\x04 \x01(\t\x12\x10\n\x08is_array\x18\x05 \x01(\x08\x12\x17\n\x0f\x61rray_dimension\x18\x06 \x01(\x05\x12\x1f\n\x17\x61rray_dimension_present\x18\x07 \x01(\x08\x12\x1d\n\x15mx_attribute_category\x18\x08 \x01(\x05\x12\x1f\n\x17security_classification\x18\t \x01(\x05\x12\x15\n\ris_historized\x18\n \x01(\x08\x12\x10\n\x08is_alarm\x18\x0b \x01(\x08\x32\xcc\x03\n\x10GalaxyRepository\x12h\n\x0eTestConnection\x12+.galaxy_repository.v1.TestConnectionRequest\x1a).galaxy_repository.v1.TestConnectionReply\x12q\n\x11GetLastDeployTime\x12..galaxy_repository.v1.GetLastDeployTimeRequest\x1a,.galaxy_repository.v1.GetLastDeployTimeReply\x12q\n\x11\x44iscoverHierarchy\x12..galaxy_repository.v1.DiscoverHierarchyRequest\x1a,.galaxy_repository.v1.DiscoverHierarchyReply\x12h\n\x11WatchDeployEvents\x12..galaxy_repository.v1.WatchDeployEventsRequest\x1a!.galaxy_repository.v1.DeployEvent0\x01\x42#\xaa\x02 MxGateway.Contracts.Proto.Galaxyb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17galaxy_repository.proto\x12\x14galaxy_repository.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\x17\n\x15TestConnectionRequest\"!\n\x13TestConnectionReply\x12\n\n\x02ok\x18\x01 \x01(\x08\"\x1a\n\x18GetLastDeployTimeRequest\"b\n\x16GetLastDeployTimeReply\x12\x0f\n\x07present\x18\x01 \x01(\x08\x12\x37\n\x13time_of_last_deploy\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x87\x03\n\x18\x44iscoverHierarchyRequest\x12\x11\n\tpage_size\x18\x01 \x01(\x05\x12\x12\n\npage_token\x18\x02 \x01(\t\x12\x19\n\x0froot_gobject_id\x18\x03 \x01(\x05H\x00\x12\x17\n\rroot_tag_name\x18\x04 \x01(\tH\x00\x12\x1d\n\x13root_contained_path\x18\x05 \x01(\tH\x00\x12.\n\tmax_depth\x18\x06 \x01(\x0b\x32\x1b.google.protobuf.Int32Value\x12\x14\n\x0c\x63\x61tegory_ids\x18\x07 \x03(\x05\x12\x1f\n\x17template_chain_contains\x18\x08 \x03(\t\x12\x15\n\rtag_name_glob\x18\t \x01(\t\x12\x1f\n\x12include_attributes\x18\n \x01(\x08H\x01\x88\x01\x01\x12\x1a\n\x12\x61larm_bearing_only\x18\x0b \x01(\x08\x12\x17\n\x0fhistorized_only\x18\x0c \x01(\x08\x42\x06\n\x04rootB\x15\n\x13_include_attributes\"\x82\x01\n\x16\x44iscoverHierarchyReply\x12\x33\n\x07objects\x18\x01 \x03(\x0b\x32\".galaxy_repository.v1.GalaxyObject\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\t\x12\x1a\n\x12total_object_count\x18\x03 \x01(\x05\"U\n\x18WatchDeployEventsRequest\x12\x39\n\x15last_seen_deploy_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xdd\x01\n\x0b\x44\x65ployEvent\x12\x10\n\x08sequence\x18\x01 \x01(\x04\x12/\n\x0bobserved_at\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x37\n\x13time_of_last_deploy\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12#\n\x1btime_of_last_deploy_present\x18\x04 \x01(\x08\x12\x14\n\x0cobject_count\x18\x05 \x01(\x05\x12\x17\n\x0f\x61ttribute_count\x18\x06 \x01(\x05\"\x93\x02\n\x0cGalaxyObject\x12\x12\n\ngobject_id\x18\x01 \x01(\x05\x12\x10\n\x08tag_name\x18\x02 \x01(\t\x12\x16\n\x0e\x63ontained_name\x18\x03 \x01(\t\x12\x13\n\x0b\x62rowse_name\x18\x04 \x01(\t\x12\x19\n\x11parent_gobject_id\x18\x05 \x01(\x05\x12\x0f\n\x07is_area\x18\x06 \x01(\x08\x12\x13\n\x0b\x63\x61tegory_id\x18\x07 \x01(\x05\x12\x1c\n\x14hosted_by_gobject_id\x18\x08 \x01(\x05\x12\x16\n\x0etemplate_chain\x18\t \x03(\t\x12\x39\n\nattributes\x18\n \x03(\x0b\x32%.galaxy_repository.v1.GalaxyAttribute\"\xa8\x02\n\x0fGalaxyAttribute\x12\x16\n\x0e\x61ttribute_name\x18\x01 \x01(\t\x12\x1a\n\x12\x66ull_tag_reference\x18\x02 \x01(\t\x12\x14\n\x0cmx_data_type\x18\x03 \x01(\x05\x12\x16\n\x0e\x64\x61ta_type_name\x18\x04 \x01(\t\x12\x10\n\x08is_array\x18\x05 \x01(\x08\x12\x17\n\x0f\x61rray_dimension\x18\x06 \x01(\x05\x12\x1f\n\x17\x61rray_dimension_present\x18\x07 \x01(\x08\x12\x1d\n\x15mx_attribute_category\x18\x08 \x01(\x05\x12\x1f\n\x17security_classification\x18\t \x01(\x05\x12\x15\n\ris_historized\x18\n \x01(\x08\x12\x10\n\x08is_alarm\x18\x0b \x01(\x08\x32\xcc\x03\n\x10GalaxyRepository\x12h\n\x0eTestConnection\x12+.galaxy_repository.v1.TestConnectionRequest\x1a).galaxy_repository.v1.TestConnectionReply\x12q\n\x11GetLastDeployTime\x12..galaxy_repository.v1.GetLastDeployTimeRequest\x1a,.galaxy_repository.v1.GetLastDeployTimeReply\x12q\n\x11\x44iscoverHierarchy\x12..galaxy_repository.v1.DiscoverHierarchyRequest\x1a,.galaxy_repository.v1.DiscoverHierarchyReply\x12h\n\x11WatchDeployEvents\x12..galaxy_repository.v1.WatchDeployEventsRequest\x1a!.galaxy_repository.v1.DeployEvent0\x01\x42#\xaa\x02 MxGateway.Contracts.Proto.Galaxyb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -33,26 +34,26 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'galaxy_repository_pb2', _gl
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\252\002 MxGateway.Contracts.Proto.Galaxy'
_globals['_TESTCONNECTIONREQUEST']._serialized_start=82
_globals['_TESTCONNECTIONREQUEST']._serialized_end=105
_globals['_TESTCONNECTIONREPLY']._serialized_start=107
_globals['_TESTCONNECTIONREPLY']._serialized_end=140
_globals['_GETLASTDEPLOYTIMEREQUEST']._serialized_start=142
_globals['_GETLASTDEPLOYTIMEREQUEST']._serialized_end=168
_globals['_GETLASTDEPLOYTIMEREPLY']._serialized_start=170
_globals['_GETLASTDEPLOYTIMEREPLY']._serialized_end=268
_globals['_DISCOVERHIERARCHYREQUEST']._serialized_start=270
_globals['_DISCOVERHIERARCHYREQUEST']._serialized_end=296
_globals['_DISCOVERHIERARCHYREPLY']._serialized_start=298
_globals['_DISCOVERHIERARCHYREPLY']._serialized_end=375
_globals['_WATCHDEPLOYEVENTSREQUEST']._serialized_start=377
_globals['_WATCHDEPLOYEVENTSREQUEST']._serialized_end=462
_globals['_DEPLOYEVENT']._serialized_start=465
_globals['_DEPLOYEVENT']._serialized_end=686
_globals['_GALAXYOBJECT']._serialized_start=689
_globals['_GALAXYOBJECT']._serialized_end=964
_globals['_GALAXYATTRIBUTE']._serialized_start=967
_globals['_GALAXYATTRIBUTE']._serialized_end=1263
_globals['_GALAXYREPOSITORY']._serialized_start=1266
_globals['_GALAXYREPOSITORY']._serialized_end=1726
_globals['_TESTCONNECTIONREQUEST']._serialized_start=114
_globals['_TESTCONNECTIONREQUEST']._serialized_end=137
_globals['_TESTCONNECTIONREPLY']._serialized_start=139
_globals['_TESTCONNECTIONREPLY']._serialized_end=172
_globals['_GETLASTDEPLOYTIMEREQUEST']._serialized_start=174
_globals['_GETLASTDEPLOYTIMEREQUEST']._serialized_end=200
_globals['_GETLASTDEPLOYTIMEREPLY']._serialized_start=202
_globals['_GETLASTDEPLOYTIMEREPLY']._serialized_end=300
_globals['_DISCOVERHIERARCHYREQUEST']._serialized_start=303
_globals['_DISCOVERHIERARCHYREQUEST']._serialized_end=694
_globals['_DISCOVERHIERARCHYREPLY']._serialized_start=697
_globals['_DISCOVERHIERARCHYREPLY']._serialized_end=827
_globals['_WATCHDEPLOYEVENTSREQUEST']._serialized_start=829
_globals['_WATCHDEPLOYEVENTSREQUEST']._serialized_end=914
_globals['_DEPLOYEVENT']._serialized_start=917
_globals['_DEPLOYEVENT']._serialized_end=1138
_globals['_GALAXYOBJECT']._serialized_start=1141
_globals['_GALAXYOBJECT']._serialized_end=1416
_globals['_GALAXYATTRIBUTE']._serialized_start=1419
_globals['_GALAXYATTRIBUTE']._serialized_end=1715
_globals['_GALAXYREPOSITORY']._serialized_start=1718
_globals['_GALAXYREPOSITORY']._serialized_end=2178
# @@protoc_insertion_point(module_scope)
+9 -2
View File
@@ -21,6 +21,7 @@ class ClientOptions:
server_name_override: str | None = None
call_timeout: float | None = 30.0
stream_timeout: float | None = None
max_grpc_message_bytes: int = 16 * 1024 * 1024
def __post_init__(self) -> None:
if not self.endpoint:
@@ -32,6 +33,8 @@ class ClientOptions:
raise ValueError("call_timeout must be greater than zero")
if self.stream_timeout is not None and self.stream_timeout <= 0:
raise ValueError("stream_timeout must be greater than zero")
if self.max_grpc_message_bytes <= 0:
raise ValueError("max_grpc_message_bytes must be greater than zero")
def __repr__(self) -> str:
api_key = REDACTED if self.api_key else None
@@ -41,14 +44,18 @@ class ClientOptions:
f"ca_file={self.ca_file!r}, "
f"server_name_override={self.server_name_override!r}, "
f"call_timeout={self.call_timeout!r}, "
f"stream_timeout={self.stream_timeout!r})"
f"stream_timeout={self.stream_timeout!r}, "
f"max_grpc_message_bytes={self.max_grpc_message_bytes!r})"
)
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
channel_options: list[tuple[str, str]] = []
channel_options: list[tuple[str, str | int]] = [
("grpc.max_receive_message_length", options.max_grpc_message_bytes),
("grpc.max_send_message_length", options.max_grpc_message_bytes),
]
if options.server_name_override:
channel_options.append(("grpc.ssl_target_name_override", options.server_name_override))
+19 -7
View File
@@ -61,7 +61,15 @@ def test_create_channel_uses_plaintext_channel(monkeypatch: pytest.MonkeyPatch)
channel = create_channel(ClientOptions(endpoint="localhost:5000", plaintext=True))
assert channel == "plain-channel"
assert calls == [("localhost:5000", [])]
assert calls == [
(
"localhost:5000",
[
("grpc.max_receive_message_length", 16 * 1024 * 1024),
("grpc.max_send_message_length", 16 * 1024 * 1024),
],
),
]
def test_create_channel_uses_tls_channel(monkeypatch: pytest.MonkeyPatch) -> None:
@@ -95,9 +103,13 @@ def test_create_channel_uses_tls_channel(monkeypatch: pytest.MonkeyPatch) -> Non
assert channel == "tls-channel"
assert calls == [
(
"gateway.example:5001",
"creds",
[("grpc.ssl_target_name_override", "gateway.test")],
),
]
(
"gateway.example:5001",
"creds",
[
("grpc.max_receive_message_length", 16 * 1024 * 1024),
("grpc.max_send_message_length", 16 * 1024 * 1024),
("grpc.ssl_target_name_override", "gateway.test"),
],
),
]
+27
View File
@@ -98,6 +98,8 @@ async def test_discover_hierarchy_returns_proto_objects() -> None:
stub = FakeGalaxyStub()
stub.discover_hierarchy.replies = [
galaxy_pb.DiscoverHierarchyReply(
next_page_token="page-2",
total_object_count=2,
objects=[
galaxy_pb.GalaxyObject(
gobject_id=1,
@@ -106,6 +108,11 @@ async def test_discover_hierarchy_returns_proto_objects() -> None:
browse_name="TestMachine_001",
is_area=True,
),
],
),
galaxy_pb.DiscoverHierarchyReply(
total_object_count=2,
objects=[
galaxy_pb.GalaxyObject(
gobject_id=2,
tag_name="DelmiaReceiver_001",
@@ -133,10 +140,30 @@ async def test_discover_hierarchy_returns_proto_objects() -> None:
assert isinstance(objects, list)
assert len(objects) == 2
assert len(stub.discover_hierarchy.requests) == 2
assert stub.discover_hierarchy.requests[0].page_size == 5000
assert stub.discover_hierarchy.requests[0].page_token == ""
assert stub.discover_hierarchy.requests[1].page_token == "page-2"
assert objects[0].tag_name == "TestMachine_001"
assert objects[1].attributes[0].full_tag_reference == "DelmiaReceiver_001.DownloadPath"
@pytest.mark.asyncio
async def test_discover_hierarchy_rejects_repeated_page_token() -> None:
stub = FakeGalaxyStub()
stub.discover_hierarchy.replies = [
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
with pytest.raises(Exception, match="repeated page token"):
await client.discover_hierarchy()
@pytest.mark.asyncio
async def test_watch_deploy_events_yields_events_in_order() -> None:
ts1 = Timestamp()
+1 -1
View File
@@ -1038,7 +1038,7 @@ mod tests {
fn version_json_output_has_protocol_versions() {
let value = super::version_json();
assert_eq!(value["gatewayProtocolVersion"], 1);
assert_eq!(value["gatewayProtocolVersion"], 2);
assert_eq!(value["workerProtocolVersion"], 1);
}
+4 -1
View File
@@ -54,9 +54,12 @@ impl GatewayClient {
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
let max_grpc_message_bytes = options.max_grpc_message_bytes();
Ok(Self {
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor),
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(max_grpc_message_bytes)
.max_encoding_message_size(max_grpc_message_bytes),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
})
+140 -33
View File
@@ -21,6 +21,8 @@ use crate::generated::galaxy_repository::v1::{
};
use crate::options::ClientOptions;
const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000;
/// Convenience alias for the generated Galaxy client wrapped in the
/// authentication interceptor.
pub type RawGalaxyClient = GalaxyRepositoryClient<InterceptedService<Channel, AuthInterceptor>>;
@@ -77,9 +79,12 @@ impl GalaxyClient {
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
let max_grpc_message_bytes = options.max_grpc_message_bytes();
Ok(Self {
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor),
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(max_grpc_message_bytes)
.max_encoding_message_size(max_grpc_message_bytes),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
})
@@ -89,8 +94,11 @@ impl GalaxyClient {
/// channel. Tests use this to wire up an in-memory transport.
pub fn from_channel(channel: Channel, options: &ClientOptions) -> Self {
let interceptor = AuthInterceptor::new(options.api_key().cloned());
let max_grpc_message_bytes = options.max_grpc_message_bytes();
Self {
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor),
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(max_grpc_message_bytes)
.max_encoding_message_size(max_grpc_message_bytes),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
}
@@ -135,11 +143,33 @@ impl GalaxyClient {
/// Walk the deployed object hierarchy. Each [`GalaxyObject`] contains
/// the object's identifying names plus its dynamic attributes.
pub async fn discover_hierarchy(&mut self) -> Result<Vec<GalaxyObject>, Error> {
let response = self
.inner
.discover_hierarchy(self.unary_request(DiscoverHierarchyRequest {}))
.await?;
Ok(response.into_inner().objects)
let mut objects = Vec::new();
let mut seen_page_tokens = std::collections::HashSet::new();
let mut page_token = String::new();
loop {
let response = self
.inner
.discover_hierarchy(self.unary_request(DiscoverHierarchyRequest {
page_size: DISCOVER_HIERARCHY_PAGE_SIZE,
page_token,
..Default::default()
}))
.await?;
let reply = response.into_inner();
objects.extend(reply.objects);
page_token = reply.next_page_token;
if page_token.is_empty() {
return Ok(objects);
}
if !seen_page_tokens.insert(page_token.clone()) {
return Err(Error::InvalidArgument {
name: "page_token".to_owned(),
detail: format!(
"galaxy discover hierarchy returned repeated page token `{page_token}`"
),
});
}
}
}
/// Subscribe to the server-streamed deploy-event feed.
@@ -217,6 +247,8 @@ mod tests {
present: Mutex<bool>,
last_deploy: Mutex<Option<Timestamp>>,
objects: Mutex<Vec<GalaxyObject>>,
discover_requests: Mutex<Vec<DiscoverHierarchyRequest>>,
discover_replies: Mutex<std::collections::VecDeque<DiscoverHierarchyReply>>,
watch_requests: Mutex<Vec<WatchDeployEventsRequest>>,
watch_events: Mutex<Vec<DeployEvent>>,
watch_senders: Mutex<Vec<DeployEventTx>>,
@@ -256,10 +288,21 @@ mod tests {
async fn discover_hierarchy(
&self,
_request: Request<DiscoverHierarchyRequest>,
request: Request<DiscoverHierarchyRequest>,
) -> Result<Response<DiscoverHierarchyReply>, Status> {
self.state
.discover_requests
.lock()
.unwrap()
.push(request.into_inner());
if let Some(reply) = self.state.discover_replies.lock().unwrap().pop_front() {
return Ok(Response::new(reply));
}
Ok(Response::new(DiscoverHierarchyReply {
objects: self.state.objects.lock().unwrap().clone(),
next_page_token: String::new(),
total_object_count: self.state.objects.lock().unwrap().len() as i32,
}))
}
@@ -409,30 +452,58 @@ mod tests {
#[tokio::test]
async fn discover_hierarchy_returns_objects_with_attributes() {
let state = Arc::new(FakeState::default());
*state.objects.lock().unwrap() = vec![GalaxyObject {
gobject_id: 42,
tag_name: "DelmiaReceiver_001".to_owned(),
contained_name: "DelmiaReceiver".to_owned(),
browse_name: "TestMachine_001/DelmiaReceiver".to_owned(),
parent_gobject_id: 7,
is_area: false,
category_id: 3,
hosted_by_gobject_id: 1,
template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()],
attributes: vec![GalaxyAttribute {
attribute_name: "DownloadPath".to_owned(),
full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(),
mx_data_type: 8,
data_type_name: "MxString".to_owned(),
is_array: false,
array_dimension: 0,
array_dimension_present: false,
mx_attribute_category: 2,
security_classification: 1,
is_historized: false,
is_alarm: false,
}],
}];
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: vec![GalaxyObject {
gobject_id: 42,
tag_name: "DelmiaReceiver_001".to_owned(),
contained_name: "DelmiaReceiver".to_owned(),
browse_name: "TestMachine_001/DelmiaReceiver".to_owned(),
parent_gobject_id: 7,
is_area: false,
category_id: 3,
hosted_by_gobject_id: 1,
template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()],
attributes: vec![GalaxyAttribute {
attribute_name: "DownloadPath".to_owned(),
full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(),
mx_data_type: 8,
data_type_name: "MxString".to_owned(),
is_array: false,
array_dimension: 0,
array_dimension_present: false,
mx_attribute_category: 2,
security_classification: 1,
is_historized: false,
is_alarm: false,
}],
}],
next_page_token: "page-2".to_owned(),
total_object_count: 2,
});
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: vec![GalaxyObject {
gobject_id: 43,
tag_name: "DelmiaReceiver_002".to_owned(),
contained_name: String::new(),
browse_name: String::new(),
parent_gobject_id: 0,
is_area: false,
category_id: 0,
hosted_by_gobject_id: 0,
template_chain: Vec::new(),
attributes: Vec::new(),
}],
next_page_token: String::new(),
total_object_count: 2,
});
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
@@ -441,7 +512,12 @@ mod tests {
let objects = client.discover_hierarchy().await.unwrap();
assert_eq!(objects.len(), 1);
assert_eq!(objects.len(), 2);
let requests = state.discover_requests.lock().unwrap();
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].page_size, 5000);
assert_eq!(requests[0].page_token, "");
assert_eq!(requests[1].page_token, "page-2");
assert_eq!(objects[0].tag_name, "DelmiaReceiver_001");
assert_eq!(objects[0].attributes.len(), 1);
assert_eq!(objects[0].attributes[0].attribute_name, "DownloadPath");
@@ -451,6 +527,37 @@ mod tests {
);
}
#[tokio::test]
async fn discover_hierarchy_rejects_repeated_page_token() {
let state = Arc::new(FakeState::default());
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: Vec::new(),
next_page_token: "7:1".to_owned(),
total_object_count: 1,
});
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: Vec::new(),
next_page_token: "7:1".to_owned(),
total_object_count: 1,
});
let endpoint = spawn_fake(state).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let error = client.discover_hierarchy().await.unwrap_err();
assert!(error.to_string().contains("repeated page token"));
}
#[tokio::test]
async fn watch_deploy_events_yields_events_in_order() {
let state = Arc::new(FakeState::default());
+14
View File
@@ -4,6 +4,8 @@ use std::time::Duration;
use crate::auth::ApiKey;
const DEFAULT_MAX_GRPC_MESSAGE_BYTES: usize = 16 * 1024 * 1024;
#[derive(Clone)]
pub struct ClientOptions {
endpoint: String,
@@ -14,6 +16,7 @@ pub struct ClientOptions {
connect_timeout: Duration,
call_timeout: Duration,
stream_timeout: Option<Duration>,
max_grpc_message_bytes: usize,
}
impl ClientOptions {
@@ -27,6 +30,7 @@ impl ClientOptions {
connect_timeout: Duration::from_secs(10),
call_timeout: Duration::from_secs(30),
stream_timeout: None,
max_grpc_message_bytes: DEFAULT_MAX_GRPC_MESSAGE_BYTES,
}
}
@@ -65,6 +69,11 @@ impl ClientOptions {
self
}
pub fn with_max_grpc_message_bytes(mut self, max_grpc_message_bytes: usize) -> Self {
self.max_grpc_message_bytes = max_grpc_message_bytes;
self
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
@@ -96,6 +105,10 @@ impl ClientOptions {
pub fn stream_timeout(&self) -> Option<Duration> {
self.stream_timeout
}
pub fn max_grpc_message_bytes(&self) -> usize {
self.max_grpc_message_bytes
}
}
impl Default for ClientOptions {
@@ -116,6 +129,7 @@ impl fmt::Debug for ClientOptions {
.field("connect_timeout", &self.connect_timeout)
.field("call_timeout", &self.call_timeout)
.field("stream_timeout", &self.stream_timeout)
.field("max_grpc_message_bytes", &self.max_grpc_message_bytes)
.finish()
}
}
+1 -1
View File
@@ -1,3 +1,3 @@
pub const CLIENT_VERSION: &str = "0.1.0-dev";
pub const GATEWAY_PROTOCOL_VERSION: u32 = 1;
pub const GATEWAY_PROTOCOL_VERSION: u32 = 2;
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
+21 -8
View File
@@ -91,12 +91,15 @@ return ApiKeyVerificationResult.Success(new ApiKeyIdentity(
KeyId: storedKey.KeyId,
KeyPrefix: storedKey.KeyPrefix,
DisplayName: storedKey.DisplayName,
Scopes: storedKey.Scopes));
Scopes: storedKey.Scopes,
Constraints: storedKey.Constraints));
```
`ApiKeyVerificationResult` carries either an `ApiKeyIdentity` or a discriminated `ApiKeyVerificationFailure` value. The failure enum distinguishes parse errors, missing pepper, missing or revoked keys, and secret mismatch so the calling middleware can emit precise audit detail without leaking which check failed to the client.
`ApiKeyIdentity` exposes only non-secret fields (`KeyId`, `KeyPrefix`, `DisplayName`, `Scopes`) and is the type downstream authorization code consumes.
`ApiKeyIdentity` exposes only non-secret fields (`KeyId`, `KeyPrefix`,
`DisplayName`, `Scopes`, and `Constraints`) and is the type downstream
authorization code consumes.
## Storage
@@ -131,7 +134,9 @@ public SqliteConnection CreateConnection()
`SqliteAuthSchema` declares table names and the current schema version as constants. Three tables are involved:
- `api_keys` stores `key_id`, `key_prefix`, the `secret_hash` blob, `display_name`, serialized `scopes`, and the `created_utc`, `last_used_utc`, and `revoked_utc` timestamps.
- `api_keys` stores `key_id`, `key_prefix`, the `secret_hash` blob,
`display_name`, serialized `scopes`, optional serialized `constraints`, and
the `created_utc`, `last_used_utc`, and `revoked_utc` timestamps.
- `api_key_audit` is an append-only log keyed by an autoincrement `audit_id` with `key_id`, `event_type`, `remote_address`, `created_utc`, and `details` columns.
- `schema_version` carries a single row whose `version` column is matched against `SqliteAuthSchema.CurrentVersion`.
@@ -150,9 +155,10 @@ public static ApiKeyRecord Read(SqliteDataReader reader)
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
Constraints: ApiKeyConstraintSerializer.Deserialize(reader.IsDBNull(5) ? null : reader.GetString(5)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(6), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 7),
RevokedUtc: ReadNullableDateTimeOffset(reader, 8));
}
```
@@ -193,8 +199,8 @@ The supported subcommands match `ApiKeyAdminCommandKind` exactly:
| Subcommand | Required options | Behaviour |
|------------|------------------|-----------|
| `init-db` | none | Runs the migrator and records an audit entry. |
| `create-key` | `--key-id`, `--display-name` | Generates a new secret, stores its peppered hash, and prints the assembled `mxgw_<keyId>_<secret>` token. |
| `list-keys` | none | Lists every stored key with its scopes and revocation state. |
| `create-key` | `--key-id`, `--display-name` | Generates a new secret, stores its peppered hash and optional constraints, and prints the assembled `mxgw_<keyId>_<secret>` token. |
| `list-keys` | none | Lists every stored key with its scopes, constraints, and revocation state. |
| `revoke-key` | `--key-id` | Sets `revoked_utc` if the key is currently active. |
| `rotate-key` | `--key-id` | Replaces the secret hash and prints the new token. |
@@ -203,11 +209,18 @@ Examples:
```bash
mxgateway apikey init-db
mxgateway apikey create-key --key-id ops.alice --display-name "Alice (ops)" --scopes read,write
mxgateway apikey create-key --key-id area1.reader --display-name "Area 1 reader" --scopes invoke:read,metadata:read --read-subtree "Area1/*" --browse-subtree "Area1/*"
mxgateway apikey list-keys --json
mxgateway apikey revoke-key --key-id ops.alice
mxgateway apikey rotate-key --key-id ops.alice
```
Constraint flags are optional. `--read-subtree`, `--write-subtree`,
`--read-tag-glob`, `--write-tag-glob`, and `--browse-subtree` are repeatable.
`--max-write-classification` accepts one integer. `--read-alarm-only` and
`--read-historized-only` are boolean flags. Existing rows with null
constraints remain fully unconstrained after migration.
Key ids are restricted by the parser to ASCII letters, digits, periods, and hyphens so they remain safe to embed in the token format and in URL paths used by administrative tooling.
## Scope Serialization
+45 -2
View File
@@ -1,6 +1,8 @@
# Gateway gRPC Authorization
The authorization subsystem enforces per-RPC scope checks against the authenticated `ApiKeyIdentity` produced by the authentication layer, so service implementations never need to repeat permission logic.
The authorization subsystem has two layers. The gRPC interceptor enforces the
verb scope required by the RPC. Service-layer constraint checks then narrow
what an authenticated API key can browse, read, or write inside the Galaxy.
## Overview
@@ -12,6 +14,8 @@ The participating types live under `src/MxGateway.Server/Security/Authorization/
- `GatewayGrpcScopeResolver` maps a request message (and, for `MxCommandRequest`, the inner `MxCommandKind`) to the scope string that must be present on the caller.
- `GatewayScopes` exposes the canonical scope constants used by the resolver and any downstream consumer.
- `GatewayRequestIdentityAccessor` and `IGatewayRequestIdentityAccessor` expose the verified identity to handlers and any service code that runs inside the call.
- `IConstraintEnforcer` applies optional API-key constraints against the
cached Galaxy hierarchy from service bodies.
- `GrpcAuthorizationServiceCollectionExtensions` wires the components into the DI container and the gRPC pipeline.
The `ApiKeyIdentity` consumed here is produced by the authentication layer; see [Authentication](./Authentication.md) for how it is built and how scopes are persisted.
@@ -21,7 +25,9 @@ The `ApiKeyIdentity` consumed here is produced by the authentication layer; see
Centralizing the policy in `GatewayGrpcAuthorizationInterceptor` produces three concrete benefits:
1. Every RPC defined in `MxAccessGatewayService` is covered by construction. A new RPC inherits the check the moment its request type is added to `GatewayGrpcScopeResolver`, instead of relying on each service method to remember to call an authorization helper.
2. The service class stays a thin translator between proto contracts and domain calls. RPC methods do not branch on identity or scope, which keeps the AGENTS.md guideline that gRPC handlers contain no policy.
2. Verb-scope policy stays centralized. Request-specific constraints still run
in service bodies because they need command payloads, item handles, and
Galaxy metadata that the interceptor should not inspect.
3. Authentication and authorization happen in one place, so the gRPC `Status` mapping is consistent. A failed key check always returns `Unauthenticated`, and a missing scope always returns `PermissionDenied` with the offending scope name.
## Interceptor Flow
@@ -131,6 +137,43 @@ private static string ResolveCommandScope(MxCommandKind kind)
Reads (`Register`, `AddItem`, `Advise`, and any other unspecified kind) fall through to `InvokeRead`, which keeps the matrix small while still separating reads from writes, secured writes, metadata lookups, event drains, and worker shutdown.
## Constraint Enforcement
`ApiKeyIdentity.Constraints` is optional. Empty constraints preserve the
previous behavior: the key is authorized only by its verb scopes. Non-empty
constraints are stored as JSON in `api_keys.constraints` and are applied by
`IConstraintEnforcer` after the interceptor succeeds.
Supported constraints are:
| Constraint | Meaning |
|------------|---------|
| `read_subtrees` | Contained-path globs allowed for read/subscription commands. |
| `write_subtrees` | Contained-path globs allowed for write commands. |
| `read_tag_globs` | Tag-address globs allowed for read/subscription commands. |
| `write_tag_globs` | Tag-address globs allowed for write commands. |
| `max_write_classification` | Maximum Galaxy attribute `security_classification` a key may write. |
| `browse_subtrees` | Contained-path globs used to filter Galaxy browse results and deploy-event counts. |
| `read_alarm_only` | Read/subscription commands must target objects with alarm-bearing attributes. |
| `read_historized_only` | Read/subscription commands must target objects with historized attributes. |
Glob matching is anchored, case-insensitive, and supports `*` and `?`.
Subtree and tag glob lists are alternatives: matching either list allows that
scope dimension. Empty lists mean unconstrained for that dimension.
The service checks read constraints for `AddItem`, `AddItem2`, `AddItemBulk`,
`SubscribeBulk`, and `AdviseItemBulk`. It checks write constraints for
`Write`, `Write2`, `WriteSecured`, and `WriteSecured2`. Successful item
registrations are tracked per session so later item-handle commands resolve
back to the original tag address. If a constrained key presents an unknown item
handle, the gateway fails closed.
Non-bulk constraint failures return gRPC `PermissionDenied`. Bulk read
commands preserve input order and return a failed `SubscribeResult` for each
denied item while still forwarding allowed items to the worker. Every denial
adds an `api_key_audit` entry with the key id, command kind, target, and
blocking constraint; secured values and raw credentials are never logged.
## Scope Catalog
`GatewayScopes` is the single source of truth for scope strings. Every entry is currently mapped by either the resolver or another security component:
+60 -13
View File
@@ -32,13 +32,23 @@ The service is defined in
|-----|---------|
| `TestConnection` | Connectivity probe. Returns `{ ok: bool }` after a `SELECT 1`. Does not throw on SQL failure — returns `ok = false`. Always hits SQL directly so it remains a true health check. |
| `GetLastDeployTime` | Returns the cached `galaxy.time_of_last_deploy`. Served from the shared hierarchy cache; refreshed in the background. |
| `DiscoverHierarchy` | Returns the full deployed hierarchy plus every object's dynamic attributes. **Served from cache** — see [Hierarchy Cache](#hierarchy-cache). |
| `DiscoverHierarchy` | Returns one page of the deployed hierarchy plus each returned object's dynamic attributes. **Served from cache** — see [Hierarchy Cache](#hierarchy-cache). |
| `WatchDeployEvents` | **Server-streaming.** The server emits the current state immediately on subscribe (so clients can bootstrap without waiting), then emits one event per detected deploy change. See [Deploy Notifications](#deploy-notifications). |
`DiscoverHierarchy` is intentionally a single unary RPC rather than a stream:
the row set is small (thousands of objects, low tens-of-thousands of
attributes for typical Galaxies) and clients almost always want the whole tree
at once.
`DiscoverHierarchy` is a paged unary RPC. The raw request accepts `page_size`
and `page_token`; the server defaults omitted page size to 1000 objects and
caps every page at 5000 objects. Page tokens bind to the cache sequence and the
active filter set, so changing filters between pages returns `InvalidArgument`
instead of mixing snapshots. Official high-level clients preserve the older
"return the full hierarchy" behavior by looping pages internally.
The request can also slice the cached hierarchy without running new SQL. A
caller may choose one root (`root_gobject_id`, `root_tag_name`, or
`root_contained_path`) and may combine that with `max_depth`, category ids,
template-chain substring filters, an anchored case-insensitive tag-name glob,
alarm-only, historized-only, and `include_attributes = false` for a skeleton
tree. All filters are applied with AND semantics, and `total_object_count`
reports the post-filter count.
## Hierarchy Cache
@@ -56,12 +66,14 @@ Refresh strategy is **deploy-time gated**:
3. If the deploy timestamp is unchanged, the heavy hierarchy + attributes
queries are **skipped**. The cache simply marks `LastSuccessAt`.
4. If the deploy timestamp changed (or no data has loaded yet), the cache
pulls hierarchy + attributes, materializes a `DiscoverHierarchyReply`
once, replaces the entry atomically, and publishes a deploy event.
pulls hierarchy + attributes, materializes a Galaxy object list plus a
dashboard summary once, replaces the entry atomically, and publishes a
deploy event.
Materializing the reply at refresh time means subsequent `DiscoverHierarchy`
calls return a pre-built proto message — no per-request projection, no
per-request allocations beyond the gRPC serializer's frame.
Materializing objects and dashboard summaries at refresh time means subsequent
`DiscoverHierarchy` calls page over an immutable object list. The dashboard
uses the precomputed summary and does not rescan raw SQL rowsets on each
snapshot.
When SQL is unreachable, the cache retains the previous data and flips
`Status` to `Stale` (or `Unavailable` if no data was ever loaded). A
@@ -139,6 +151,29 @@ message GalaxyAttribute {
bool is_historized = 10;
bool is_alarm = 11;
}
message DiscoverHierarchyRequest {
int32 page_size = 1; // omitted/0 uses the server default of 1000
string page_token = 2; // opaque token returned by the previous page
oneof root {
int32 root_gobject_id = 3;
string root_tag_name = 4;
string root_contained_path = 5;
}
google.protobuf.Int32Value max_depth = 6;
repeated int32 category_ids = 7;
repeated string template_chain_contains = 8;
string tag_name_glob = 9;
optional bool include_attributes = 10;
bool alarm_bearing_only = 11;
bool historized_only = 12;
}
message DiscoverHierarchyReply {
repeated GalaxyObject objects = 1;
string next_page_token = 2;
int32 total_object_count = 3;
}
```
### Contained Name vs Tag Name
@@ -176,7 +211,8 @@ GalaxyHierarchyRefreshService (BackgroundService)
-> GalaxyRepository.GetLastDeployTimeAsync (cheap, every tick)
-> GalaxyRepository.GetHierarchyAsync (only on deploy change)
-> GalaxyRepository.GetAttributesAsync (only on deploy change)
-> GalaxyProtoMapper.MapObject (materialize DiscoverHierarchyReply once)
-> GalaxyProtoMapper.MapObject (materialize GalaxyObject list once)
-> DashboardGalaxySummary (precompute dashboard counts once)
-> IGalaxyDeployNotifier.Publish (only on deploy change)
```
@@ -189,8 +225,9 @@ Component breakdown:
recursive CTEs and pick the most-derived attribute override per object.
- `GalaxyHierarchyCache`
(`src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs`) holds the most
recent immutable `GalaxyHierarchyCacheEntry` (rows + materialized proto
reply + counts + status). All gRPC clients share the same entry.
recent immutable `GalaxyHierarchyCacheEntry` (materialized objects +
precomputed dashboard summary + counts + status). All gRPC clients share the
same entry.
- `GalaxyHierarchyRefreshService`
(`src/MxGateway.Server/Galaxy/GalaxyHierarchyRefreshService.cs`) is a
hosted `BackgroundService` that drives `RefreshAsync` on the configured
@@ -220,6 +257,11 @@ Security`), but production deployments that use SQL authentication should set
the override via environment variable rather than committing credentials to
`appsettings.json`.
The dashboard parses this connection string and displays only non-secret
fields: server, database, integrated security, encrypt, and trust-server-
certificate. It never displays user id, password, access token, or arbitrary
unparsed connection string text.
## Authorization
All four Galaxy RPCs (including `WatchDeployEvents`) require the
@@ -228,6 +270,11 @@ privilege to `MxCommandKind.GetSessionState` or `MxCommandKind.GetWorkerInfo`.
The mapping lives in `GatewayGrpcScopeResolver`; see
[Authorization](./Authorization.md) for the full scope catalog.
API keys can also carry `browse_subtrees` constraints. `DiscoverHierarchy`
intersects those contained-path globs with the caller's request filters.
`WatchDeployEvents` still emits deploy notifications, but its object and
attribute counts are scoped to the caller's browsable subtrees.
A request without an API key returns `Unauthenticated`. A request with a key
that lacks `metadata:read` returns `PermissionDenied` with the missing scope
embedded in the status detail.
+7 -1
View File
@@ -35,6 +35,8 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
"DefaultCommandTimeoutSeconds": 30,
"MaxSessions": 64,
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
},
"Events": {
@@ -52,7 +54,8 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
"ShowTagValues": false
},
"Protocol": {
"WorkerProtocolVersion": 1
"WorkerProtocolVersion": 1,
"MaxGrpcMessageBytes": 16777216
},
"Galaxy": {
"ConnectionString": "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;",
@@ -107,6 +110,8 @@ to avoid accidental large allocations from malformed or oversized frames.
| `MxGateway:Sessions:DefaultCommandTimeoutSeconds` | `30` | Default timeout used while the gateway waits for a worker command reply when an open-session request does not provide a positive command timeout. |
| `MxGateway:Sessions:MaxSessions` | `64` | Maximum number of concurrently open gateway sessions. Session opens reserve a slot atomically before worker creation. |
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. |
All numeric session options must be greater than zero. The current event stream
@@ -146,6 +151,7 @@ and `RecentSessionLimit` must be greater than or equal to zero.
| Option | Default | Description |
|--------|---------|-------------|
| `MxGateway:Protocol:WorkerProtocolVersion` | `1` | Worker IPC protocol version expected by the gateway and worker. This must match `GatewayContractInfo.WorkerProtocolVersion`. |
| `MxGateway:Protocol:MaxGrpcMessageBytes` | `16777216` | Public gRPC max send and receive message size in bytes. The same default is used by official clients. The validator allows values from `1024` through `268435456`. |
The protocol option is exposed for diagnostics and explicit deployment
configuration, not for compatibility negotiation. A mismatch fails validation
+5
View File
@@ -31,6 +31,11 @@ A second gRPC service, `GalaxyRepositoryGrpcService`, is mapped alongside it. It
`MxAccessGatewayService` derives from the generated `MxAccessGateway.MxAccessGatewayBase` and implements every RPC declared in `mxaccess_gateway.proto`. The proto contract itself is documented in [Contracts](./Contracts.md); this section covers only what the server-side handler does on top of that contract.
Public gRPC send and receive message sizes are configured from
`MxGateway:Protocol:MaxGrpcMessageBytes` (default 16 MiB). Official clients use
the same default so paged Galaxy browse replies and larger MXAccess payloads
fail consistently instead of depending on language-specific gRPC defaults.
### `OpenSession`
`OpenSession` validates the request, asks `ISessionManager` to open a session under the caller's identity, and returns a reply that advertises both protocol versions and the capabilities the gateway supports. Capability strings are static because the gateway has a fixed feature set per build; clients use them as a forward-compatibility hint rather than runtime negotiation.
+2 -2
View File
@@ -178,9 +178,9 @@ The order — fault, deregister, dispose, release slot, record metric, log, reth
While `Ready`, callers reach the worker through `SessionManager.InvokeAsync` or `ReadEventsAsync`. Both delegate to `GatewaySession`, which checks the state under lock and updates `LastClientActivityAt` on every invocation. `GatewaySession` also exposes typed bulk helpers (`AddItemBulkAsync`, `SubscribeBulkAsync`, etc.) that wrap `WorkerCommand` round-trips and translate non-`Ok` `ProtocolStatus` replies into `SessionManagerException` with `SessionNotReady`.
Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false the second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel.
Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false the second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. Active event subscribers keep the session lease from expiring until the stream is disposed.
`ExtendLease` and `IsLeaseExpired` cooperate with `SessionManager.CloseExpiredLeasesAsync`, which iterates a registry snapshot and closes any session whose lease has expired with `LeaseExpiredReason`.
Sessions open with `MxGateway:Sessions:DefaultLeaseSeconds` (default 1800) added to the open timestamp. Unary client activity refreshes the lease by the same duration. `ExtendLease` and `IsLeaseExpired` cooperate with `SessionManager.CloseExpiredLeasesAsync`, which iterates a registry snapshot and closes any session whose lease has expired with `LeaseExpiredReason`. `SessionLeaseMonitorHostedService` runs that sweep every `MxGateway:Sessions:LeaseSweepIntervalSeconds` seconds (default 30).
### Close
@@ -6,7 +6,7 @@ namespace MxGateway.Contracts;
/// </summary>
public static class GatewayContractInfo
{
public const uint GatewayProtocolVersion = 1;
public const uint GatewayProtocolVersion = 2;
public const uint WorkerProtocolVersion = 1;
@@ -25,54 +25,64 @@ namespace MxGateway.Contracts.Proto.Galaxy {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"ChdnYWxheHlfcmVwb3NpdG9yeS5wcm90bxIUZ2FsYXh5X3JlcG9zaXRvcnku",
"djEaH2dvb2dsZS9wcm90b2J1Zi90aW1lc3RhbXAucHJvdG8iFwoVVGVzdENv",
"bm5lY3Rpb25SZXF1ZXN0IiEKE1Rlc3RDb25uZWN0aW9uUmVwbHkSCgoCb2sY",
"ASABKAgiGgoYR2V0TGFzdERlcGxveVRpbWVSZXF1ZXN0ImIKFkdldExhc3RE",
"ZXBsb3lUaW1lUmVwbHkSDwoHcHJlc2VudBgBIAEoCBI3ChN0aW1lX29mX2xh",
"c3RfZGVwbG95GAIgASgLMhouZ29vZ2xlLnByb3RvYnVmLlRpbWVzdGFtcCIa",
"ChhEaXNjb3ZlckhpZXJhcmNoeVJlcXVlc3QiTQoWRGlzY292ZXJIaWVyYXJj",
"aHlSZXBseRIzCgdvYmplY3RzGAEgAygLMiIuZ2FsYXh5X3JlcG9zaXRvcnku",
"djEuR2FsYXh5T2JqZWN0IlUKGFdhdGNoRGVwbG95RXZlbnRzUmVxdWVzdBI5",
"ChVsYXN0X3NlZW5fZGVwbG95X3RpbWUYASABKAsyGi5nb29nbGUucHJvdG9i",
"dWYuVGltZXN0YW1wIt0BCgtEZXBsb3lFdmVudBIQCghzZXF1ZW5jZRgBIAEo",
"BBIvCgtvYnNlcnZlZF9hdBgCIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1l",
"c3RhbXASNwoTdGltZV9vZl9sYXN0X2RlcGxveRgDIAEoCzIaLmdvb2dsZS5w",
"cm90b2J1Zi5UaW1lc3RhbXASIwobdGltZV9vZl9sYXN0X2RlcGxveV9wcmVz",
"ZW50GAQgASgIEhQKDG9iamVjdF9jb3VudBgFIAEoBRIXCg9hdHRyaWJ1dGVf",
"Y291bnQYBiABKAUikwIKDEdhbGF4eU9iamVjdBISCgpnb2JqZWN0X2lkGAEg",
"ASgFEhAKCHRhZ19uYW1lGAIgASgJEhYKDmNvbnRhaW5lZF9uYW1lGAMgASgJ",
"EhMKC2Jyb3dzZV9uYW1lGAQgASgJEhkKEXBhcmVudF9nb2JqZWN0X2lkGAUg",
"ASgFEg8KB2lzX2FyZWEYBiABKAgSEwoLY2F0ZWdvcnlfaWQYByABKAUSHAoU",
"aG9zdGVkX2J5X2dvYmplY3RfaWQYCCABKAUSFgoOdGVtcGxhdGVfY2hhaW4Y",
"CSADKAkSOQoKYXR0cmlidXRlcxgKIAMoCzIlLmdhbGF4eV9yZXBvc2l0b3J5",
"LnYxLkdhbGF4eUF0dHJpYnV0ZSKoAgoPR2FsYXh5QXR0cmlidXRlEhYKDmF0",
"dHJpYnV0ZV9uYW1lGAEgASgJEhoKEmZ1bGxfdGFnX3JlZmVyZW5jZRgCIAEo",
"CRIUCgxteF9kYXRhX3R5cGUYAyABKAUSFgoOZGF0YV90eXBlX25hbWUYBCAB",
"KAkSEAoIaXNfYXJyYXkYBSABKAgSFwoPYXJyYXlfZGltZW5zaW9uGAYgASgF",
"Eh8KF2FycmF5X2RpbWVuc2lvbl9wcmVzZW50GAcgASgIEh0KFW14X2F0dHJp",
"YnV0ZV9jYXRlZ29yeRgIIAEoBRIfChdzZWN1cml0eV9jbGFzc2lmaWNhdGlv",
"bhgJIAEoBRIVCg1pc19oaXN0b3JpemVkGAogASgIEhAKCGlzX2FsYXJtGAsg",
"ASgIMswDChBHYWxheHlSZXBvc2l0b3J5EmgKDlRlc3RDb25uZWN0aW9uEisu",
"Z2FsYXh5X3JlcG9zaXRvcnkudjEuVGVzdENvbm5lY3Rpb25SZXF1ZXN0Giku",
"Z2FsYXh5X3JlcG9zaXRvcnkudjEuVGVzdENvbm5lY3Rpb25SZXBseRJxChFH",
"ZXRMYXN0RGVwbG95VGltZRIuLmdhbGF4eV9yZXBvc2l0b3J5LnYxLkdldExh",
"c3REZXBsb3lUaW1lUmVxdWVzdBosLmdhbGF4eV9yZXBvc2l0b3J5LnYxLkdl",
"dExhc3REZXBsb3lUaW1lUmVwbHkScQoRRGlzY292ZXJIaWVyYXJjaHkSLi5n",
"YWxheHlfcmVwb3NpdG9yeS52MS5EaXNjb3ZlckhpZXJhcmNoeVJlcXVlc3Qa",
"LC5nYWxheHlfcmVwb3NpdG9yeS52MS5EaXNjb3ZlckhpZXJhcmNoeVJlcGx5",
"EmgKEVdhdGNoRGVwbG95RXZlbnRzEi4uZ2FsYXh5X3JlcG9zaXRvcnkudjEu",
"V2F0Y2hEZXBsb3lFdmVudHNSZXF1ZXN0GiEuZ2FsYXh5X3JlcG9zaXRvcnku",
"djEuRGVwbG95RXZlbnQwAUIjqgIgTXhHYXRld2F5LkNvbnRyYWN0cy5Qcm90",
"by5HYWxheHliBnByb3RvMw=="));
"djEaH2dvb2dsZS9wcm90b2J1Zi90aW1lc3RhbXAucHJvdG8aHmdvb2dsZS9w",
"cm90b2J1Zi93cmFwcGVycy5wcm90byIXChVUZXN0Q29ubmVjdGlvblJlcXVl",
"c3QiIQoTVGVzdENvbm5lY3Rpb25SZXBseRIKCgJvaxgBIAEoCCIaChhHZXRM",
"YXN0RGVwbG95VGltZVJlcXVlc3QiYgoWR2V0TGFzdERlcGxveVRpbWVSZXBs",
"eRIPCgdwcmVzZW50GAEgASgIEjcKE3RpbWVfb2ZfbGFzdF9kZXBsb3kYAiAB",
"KAsyGi5nb29nbGUucHJvdG9idWYuVGltZXN0YW1wIocDChhEaXNjb3Zlckhp",
"ZXJhcmNoeVJlcXVlc3QSEQoJcGFnZV9zaXplGAEgASgFEhIKCnBhZ2VfdG9r",
"ZW4YAiABKAkSGQoPcm9vdF9nb2JqZWN0X2lkGAMgASgFSAASFwoNcm9vdF90",
"YWdfbmFtZRgEIAEoCUgAEh0KE3Jvb3RfY29udGFpbmVkX3BhdGgYBSABKAlI",
"ABIuCgltYXhfZGVwdGgYBiABKAsyGy5nb29nbGUucHJvdG9idWYuSW50MzJW",
"YWx1ZRIUCgxjYXRlZ29yeV9pZHMYByADKAUSHwoXdGVtcGxhdGVfY2hhaW5f",
"Y29udGFpbnMYCCADKAkSFQoNdGFnX25hbWVfZ2xvYhgJIAEoCRIfChJpbmNs",
"dWRlX2F0dHJpYnV0ZXMYCiABKAhIAYgBARIaChJhbGFybV9iZWFyaW5nX29u",
"bHkYCyABKAgSFwoPaGlzdG9yaXplZF9vbmx5GAwgASgIQgYKBHJvb3RCFQoT",
"X2luY2x1ZGVfYXR0cmlidXRlcyKCAQoWRGlzY292ZXJIaWVyYXJjaHlSZXBs",
"eRIzCgdvYmplY3RzGAEgAygLMiIuZ2FsYXh5X3JlcG9zaXRvcnkudjEuR2Fs",
"YXh5T2JqZWN0EhcKD25leHRfcGFnZV90b2tlbhgCIAEoCRIaChJ0b3RhbF9v",
"YmplY3RfY291bnQYAyABKAUiVQoYV2F0Y2hEZXBsb3lFdmVudHNSZXF1ZXN0",
"EjkKFWxhc3Rfc2Vlbl9kZXBsb3lfdGltZRgBIAEoCzIaLmdvb2dsZS5wcm90",
"b2J1Zi5UaW1lc3RhbXAi3QEKC0RlcGxveUV2ZW50EhAKCHNlcXVlbmNlGAEg",
"ASgEEi8KC29ic2VydmVkX2F0GAIgASgLMhouZ29vZ2xlLnByb3RvYnVmLlRp",
"bWVzdGFtcBI3ChN0aW1lX29mX2xhc3RfZGVwbG95GAMgASgLMhouZ29vZ2xl",
"LnByb3RvYnVmLlRpbWVzdGFtcBIjCht0aW1lX29mX2xhc3RfZGVwbG95X3By",
"ZXNlbnQYBCABKAgSFAoMb2JqZWN0X2NvdW50GAUgASgFEhcKD2F0dHJpYnV0",
"ZV9jb3VudBgGIAEoBSKTAgoMR2FsYXh5T2JqZWN0EhIKCmdvYmplY3RfaWQY",
"ASABKAUSEAoIdGFnX25hbWUYAiABKAkSFgoOY29udGFpbmVkX25hbWUYAyAB",
"KAkSEwoLYnJvd3NlX25hbWUYBCABKAkSGQoRcGFyZW50X2dvYmplY3RfaWQY",
"BSABKAUSDwoHaXNfYXJlYRgGIAEoCBITCgtjYXRlZ29yeV9pZBgHIAEoBRIc",
"ChRob3N0ZWRfYnlfZ29iamVjdF9pZBgIIAEoBRIWCg50ZW1wbGF0ZV9jaGFp",
"bhgJIAMoCRI5CgphdHRyaWJ1dGVzGAogAygLMiUuZ2FsYXh5X3JlcG9zaXRv",
"cnkudjEuR2FsYXh5QXR0cmlidXRlIqgCCg9HYWxheHlBdHRyaWJ1dGUSFgoO",
"YXR0cmlidXRlX25hbWUYASABKAkSGgoSZnVsbF90YWdfcmVmZXJlbmNlGAIg",
"ASgJEhQKDG14X2RhdGFfdHlwZRgDIAEoBRIWCg5kYXRhX3R5cGVfbmFtZRgE",
"IAEoCRIQCghpc19hcnJheRgFIAEoCBIXCg9hcnJheV9kaW1lbnNpb24YBiAB",
"KAUSHwoXYXJyYXlfZGltZW5zaW9uX3ByZXNlbnQYByABKAgSHQoVbXhfYXR0",
"cmlidXRlX2NhdGVnb3J5GAggASgFEh8KF3NlY3VyaXR5X2NsYXNzaWZpY2F0",
"aW9uGAkgASgFEhUKDWlzX2hpc3Rvcml6ZWQYCiABKAgSEAoIaXNfYWxhcm0Y",
"CyABKAgyzAMKEEdhbGF4eVJlcG9zaXRvcnkSaAoOVGVzdENvbm5lY3Rpb24S",
"Ky5nYWxheHlfcmVwb3NpdG9yeS52MS5UZXN0Q29ubmVjdGlvblJlcXVlc3Qa",
"KS5nYWxheHlfcmVwb3NpdG9yeS52MS5UZXN0Q29ubmVjdGlvblJlcGx5EnEK",
"EUdldExhc3REZXBsb3lUaW1lEi4uZ2FsYXh5X3JlcG9zaXRvcnkudjEuR2V0",
"TGFzdERlcGxveVRpbWVSZXF1ZXN0GiwuZ2FsYXh5X3JlcG9zaXRvcnkudjEu",
"R2V0TGFzdERlcGxveVRpbWVSZXBseRJxChFEaXNjb3ZlckhpZXJhcmNoeRIu",
"LmdhbGF4eV9yZXBvc2l0b3J5LnYxLkRpc2NvdmVySGllcmFyY2h5UmVxdWVz",
"dBosLmdhbGF4eV9yZXBvc2l0b3J5LnYxLkRpc2NvdmVySGllcmFyY2h5UmVw",
"bHkSaAoRV2F0Y2hEZXBsb3lFdmVudHMSLi5nYWxheHlfcmVwb3NpdG9yeS52",
"MS5XYXRjaERlcGxveUV2ZW50c1JlcXVlc3QaIS5nYWxheHlfcmVwb3NpdG9y",
"eS52MS5EZXBsb3lFdmVudDABQiOqAiBNeEdhdGV3YXkuQ29udHJhY3RzLlBy",
"b3RvLkdhbGF4eWIGcHJvdG8z"));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, },
new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.WrappersReflection.Descriptor, },
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.TestConnectionRequest), global::MxGateway.Contracts.Proto.Galaxy.TestConnectionRequest.Parser, null, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.TestConnectionReply), global::MxGateway.Contracts.Proto.Galaxy.TestConnectionReply.Parser, new[]{ "Ok" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.GetLastDeployTimeRequest), global::MxGateway.Contracts.Proto.Galaxy.GetLastDeployTimeRequest.Parser, null, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.GetLastDeployTimeReply), global::MxGateway.Contracts.Proto.Galaxy.GetLastDeployTimeReply.Parser, new[]{ "Present", "TimeOfLastDeploy" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyRequest), global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyRequest.Parser, null, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyReply), global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyReply.Parser, new[]{ "Objects" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyRequest), global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyRequest.Parser, new[]{ "PageSize", "PageToken", "RootGobjectId", "RootTagName", "RootContainedPath", "MaxDepth", "CategoryIds", "TemplateChainContains", "TagNameGlob", "IncludeAttributes", "AlarmBearingOnly", "HistorizedOnly" }, new[]{ "Root", "IncludeAttributes" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyReply), global::MxGateway.Contracts.Proto.Galaxy.DiscoverHierarchyReply.Parser, new[]{ "Objects", "NextPageToken", "TotalObjectCount" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.WatchDeployEventsRequest), global::MxGateway.Contracts.Proto.Galaxy.WatchDeployEventsRequest.Parser, new[]{ "LastSeenDeployTime" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.DeployEvent), global::MxGateway.Contracts.Proto.Galaxy.DeployEvent.Parser, new[]{ "Sequence", "ObservedAt", "TimeOfLastDeploy", "TimeOfLastDeployPresent", "ObjectCount", "AttributeCount" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::MxGateway.Contracts.Proto.Galaxy.GalaxyObject), global::MxGateway.Contracts.Proto.Galaxy.GalaxyObject.Parser, new[]{ "GobjectId", "TagName", "ContainedName", "BrowseName", "ParentGobjectId", "IsArea", "CategoryId", "HostedByGobjectId", "TemplateChain", "Attributes" }, null, null, null, null),
@@ -855,6 +865,7 @@ namespace MxGateway.Contracts.Proto.Galaxy {
{
private static readonly pb::MessageParser<DiscoverHierarchyRequest> _parser = new pb::MessageParser<DiscoverHierarchyRequest>(() => new DiscoverHierarchyRequest());
private pb::UnknownFieldSet _unknownFields;
private int _hasBits0;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public static pb::MessageParser<DiscoverHierarchyRequest> Parser { get { return _parser; } }
@@ -882,6 +893,28 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public DiscoverHierarchyRequest(DiscoverHierarchyRequest other) : this() {
_hasBits0 = other._hasBits0;
pageSize_ = other.pageSize_;
pageToken_ = other.pageToken_;
MaxDepth = other.MaxDepth;
categoryIds_ = other.categoryIds_.Clone();
templateChainContains_ = other.templateChainContains_.Clone();
tagNameGlob_ = other.tagNameGlob_;
includeAttributes_ = other.includeAttributes_;
alarmBearingOnly_ = other.alarmBearingOnly_;
historizedOnly_ = other.historizedOnly_;
switch (other.RootCase) {
case RootOneofCase.RootGobjectId:
RootGobjectId = other.RootGobjectId;
break;
case RootOneofCase.RootTagName:
RootTagName = other.RootTagName;
break;
case RootOneofCase.RootContainedPath:
RootContainedPath = other.RootContainedPath;
break;
}
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
@@ -891,6 +924,258 @@ namespace MxGateway.Contracts.Proto.Galaxy {
return new DiscoverHierarchyRequest(this);
}
/// <summary>Field number for the "page_size" field.</summary>
public const int PageSizeFieldNumber = 1;
private int pageSize_;
/// <summary>
/// Maximum number of objects to return. The server applies its default when
/// unset and rejects non-positive values.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int PageSize {
get { return pageSize_; }
set {
pageSize_ = value;
}
}
/// <summary>Field number for the "page_token" field.</summary>
public const int PageTokenFieldNumber = 2;
private string pageToken_ = "";
/// <summary>
/// Opaque token returned by a previous DiscoverHierarchy response.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string PageToken {
get { return pageToken_; }
set {
pageToken_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
/// <summary>Field number for the "root_gobject_id" field.</summary>
public const int RootGobjectIdFieldNumber = 3;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int RootGobjectId {
get { return HasRootGobjectId ? (int) root_ : 0; }
set {
root_ = value;
rootCase_ = RootOneofCase.RootGobjectId;
}
}
/// <summary>Gets whether the "root_gobject_id" field is set</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool HasRootGobjectId {
get { return rootCase_ == RootOneofCase.RootGobjectId; }
}
/// <summary> Clears the value of the oneof if it's currently set to "root_gobject_id" </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void ClearRootGobjectId() {
if (HasRootGobjectId) {
ClearRoot();
}
}
/// <summary>Field number for the "root_tag_name" field.</summary>
public const int RootTagNameFieldNumber = 4;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string RootTagName {
get { return HasRootTagName ? (string) root_ : ""; }
set {
root_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
rootCase_ = RootOneofCase.RootTagName;
}
}
/// <summary>Gets whether the "root_tag_name" field is set</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool HasRootTagName {
get { return rootCase_ == RootOneofCase.RootTagName; }
}
/// <summary> Clears the value of the oneof if it's currently set to "root_tag_name" </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void ClearRootTagName() {
if (HasRootTagName) {
ClearRoot();
}
}
/// <summary>Field number for the "root_contained_path" field.</summary>
public const int RootContainedPathFieldNumber = 5;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string RootContainedPath {
get { return HasRootContainedPath ? (string) root_ : ""; }
set {
root_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
rootCase_ = RootOneofCase.RootContainedPath;
}
}
/// <summary>Gets whether the "root_contained_path" field is set</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool HasRootContainedPath {
get { return rootCase_ == RootOneofCase.RootContainedPath; }
}
/// <summary> Clears the value of the oneof if it's currently set to "root_contained_path" </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void ClearRootContainedPath() {
if (HasRootContainedPath) {
ClearRoot();
}
}
/// <summary>Field number for the "max_depth" field.</summary>
public const int MaxDepthFieldNumber = 6;
private static readonly pb::FieldCodec<int?> _single_maxDepth_codec = pb::FieldCodec.ForStructWrapper<int>(50);
private int? maxDepth_;
/// <summary>
/// Optional. Cap on descendant depth from root. Zero returns only the root.
/// Unset means unlimited depth.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int? MaxDepth {
get { return maxDepth_; }
set {
maxDepth_ = value;
}
}
/// <summary>Field number for the "category_ids" field.</summary>
public const int CategoryIdsFieldNumber = 7;
private static readonly pb::FieldCodec<int> _repeated_categoryIds_codec
= pb::FieldCodec.ForInt32(58);
private readonly pbc::RepeatedField<int> categoryIds_ = new pbc::RepeatedField<int>();
/// <summary>
/// Optional object category id filters.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public pbc::RepeatedField<int> CategoryIds {
get { return categoryIds_; }
}
/// <summary>Field number for the "template_chain_contains" field.</summary>
public const int TemplateChainContainsFieldNumber = 8;
private static readonly pb::FieldCodec<string> _repeated_templateChainContains_codec
= pb::FieldCodec.ForString(66);
private readonly pbc::RepeatedField<string> templateChainContains_ = new pbc::RepeatedField<string>();
/// <summary>
/// Optional case-insensitive substring filters against template names.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public pbc::RepeatedField<string> TemplateChainContains {
get { return templateChainContains_; }
}
/// <summary>Field number for the "tag_name_glob" field.</summary>
public const int TagNameGlobFieldNumber = 9;
private string tagNameGlob_ = "";
/// <summary>
/// Optional anchored, case-insensitive glob over object tag_name.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string TagNameGlob {
get { return tagNameGlob_; }
set {
tagNameGlob_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
/// <summary>Field number for the "include_attributes" field.</summary>
public const int IncludeAttributesFieldNumber = 10;
private readonly static bool IncludeAttributesDefaultValue = false;
private bool includeAttributes_;
/// <summary>
/// Optional. Unset or true includes attributes. False returns object skeletons.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool IncludeAttributes {
get { if ((_hasBits0 & 1) != 0) { return includeAttributes_; } else { return IncludeAttributesDefaultValue; } }
set {
_hasBits0 |= 1;
includeAttributes_ = value;
}
}
/// <summary>Gets whether the "include_attributes" field is set</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool HasIncludeAttributes {
get { return (_hasBits0 & 1) != 0; }
}
/// <summary>Clears the value of the "include_attributes" field</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void ClearIncludeAttributes() {
_hasBits0 &= ~1;
}
/// <summary>Field number for the "alarm_bearing_only" field.</summary>
public const int AlarmBearingOnlyFieldNumber = 11;
private bool alarmBearingOnly_;
/// <summary>
/// Optional. Return only objects with at least one alarm-bearing attribute.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool AlarmBearingOnly {
get { return alarmBearingOnly_; }
set {
alarmBearingOnly_ = value;
}
}
/// <summary>Field number for the "historized_only" field.</summary>
public const int HistorizedOnlyFieldNumber = 12;
private bool historizedOnly_;
/// <summary>
/// Optional. Return only objects with at least one historized attribute.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool HistorizedOnly {
get { return historizedOnly_; }
set {
historizedOnly_ = value;
}
}
private object root_;
/// <summary>Enum of possible cases for the "root" oneof.</summary>
public enum RootOneofCase {
None = 0,
RootGobjectId = 3,
RootTagName = 4,
RootContainedPath = 5,
}
private RootOneofCase rootCase_ = RootOneofCase.None;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public RootOneofCase RootCase {
get { return rootCase_; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void ClearRoot() {
rootCase_ = RootOneofCase.None;
root_ = null;
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override bool Equals(object other) {
@@ -906,6 +1191,19 @@ namespace MxGateway.Contracts.Proto.Galaxy {
if (ReferenceEquals(other, this)) {
return true;
}
if (PageSize != other.PageSize) return false;
if (PageToken != other.PageToken) return false;
if (RootGobjectId != other.RootGobjectId) return false;
if (RootTagName != other.RootTagName) return false;
if (RootContainedPath != other.RootContainedPath) return false;
if (MaxDepth != other.MaxDepth) return false;
if(!categoryIds_.Equals(other.categoryIds_)) return false;
if(!templateChainContains_.Equals(other.templateChainContains_)) return false;
if (TagNameGlob != other.TagNameGlob) return false;
if (IncludeAttributes != other.IncludeAttributes) return false;
if (AlarmBearingOnly != other.AlarmBearingOnly) return false;
if (HistorizedOnly != other.HistorizedOnly) return false;
if (RootCase != other.RootCase) return false;
return Equals(_unknownFields, other._unknownFields);
}
@@ -913,6 +1211,19 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override int GetHashCode() {
int hash = 1;
if (PageSize != 0) hash ^= PageSize.GetHashCode();
if (PageToken.Length != 0) hash ^= PageToken.GetHashCode();
if (HasRootGobjectId) hash ^= RootGobjectId.GetHashCode();
if (HasRootTagName) hash ^= RootTagName.GetHashCode();
if (HasRootContainedPath) hash ^= RootContainedPath.GetHashCode();
if (maxDepth_ != null) hash ^= MaxDepth.GetHashCode();
hash ^= categoryIds_.GetHashCode();
hash ^= templateChainContains_.GetHashCode();
if (TagNameGlob.Length != 0) hash ^= TagNameGlob.GetHashCode();
if (HasIncludeAttributes) hash ^= IncludeAttributes.GetHashCode();
if (AlarmBearingOnly != false) hash ^= AlarmBearingOnly.GetHashCode();
if (HistorizedOnly != false) hash ^= HistorizedOnly.GetHashCode();
hash ^= (int) rootCase_;
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
@@ -931,6 +1242,47 @@ namespace MxGateway.Contracts.Proto.Galaxy {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
output.WriteRawMessage(this);
#else
if (PageSize != 0) {
output.WriteRawTag(8);
output.WriteInt32(PageSize);
}
if (PageToken.Length != 0) {
output.WriteRawTag(18);
output.WriteString(PageToken);
}
if (HasRootGobjectId) {
output.WriteRawTag(24);
output.WriteInt32(RootGobjectId);
}
if (HasRootTagName) {
output.WriteRawTag(34);
output.WriteString(RootTagName);
}
if (HasRootContainedPath) {
output.WriteRawTag(42);
output.WriteString(RootContainedPath);
}
if (maxDepth_ != null) {
_single_maxDepth_codec.WriteTagAndValue(output, MaxDepth);
}
categoryIds_.WriteTo(output, _repeated_categoryIds_codec);
templateChainContains_.WriteTo(output, _repeated_templateChainContains_codec);
if (TagNameGlob.Length != 0) {
output.WriteRawTag(74);
output.WriteString(TagNameGlob);
}
if (HasIncludeAttributes) {
output.WriteRawTag(80);
output.WriteBool(IncludeAttributes);
}
if (AlarmBearingOnly != false) {
output.WriteRawTag(88);
output.WriteBool(AlarmBearingOnly);
}
if (HistorizedOnly != false) {
output.WriteRawTag(96);
output.WriteBool(HistorizedOnly);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
@@ -941,6 +1293,47 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
if (PageSize != 0) {
output.WriteRawTag(8);
output.WriteInt32(PageSize);
}
if (PageToken.Length != 0) {
output.WriteRawTag(18);
output.WriteString(PageToken);
}
if (HasRootGobjectId) {
output.WriteRawTag(24);
output.WriteInt32(RootGobjectId);
}
if (HasRootTagName) {
output.WriteRawTag(34);
output.WriteString(RootTagName);
}
if (HasRootContainedPath) {
output.WriteRawTag(42);
output.WriteString(RootContainedPath);
}
if (maxDepth_ != null) {
_single_maxDepth_codec.WriteTagAndValue(ref output, MaxDepth);
}
categoryIds_.WriteTo(ref output, _repeated_categoryIds_codec);
templateChainContains_.WriteTo(ref output, _repeated_templateChainContains_codec);
if (TagNameGlob.Length != 0) {
output.WriteRawTag(74);
output.WriteString(TagNameGlob);
}
if (HasIncludeAttributes) {
output.WriteRawTag(80);
output.WriteBool(IncludeAttributes);
}
if (AlarmBearingOnly != false) {
output.WriteRawTag(88);
output.WriteBool(AlarmBearingOnly);
}
if (HistorizedOnly != false) {
output.WriteRawTag(96);
output.WriteBool(HistorizedOnly);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
@@ -951,6 +1344,38 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int CalculateSize() {
int size = 0;
if (PageSize != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(PageSize);
}
if (PageToken.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(PageToken);
}
if (HasRootGobjectId) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(RootGobjectId);
}
if (HasRootTagName) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(RootTagName);
}
if (HasRootContainedPath) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(RootContainedPath);
}
if (maxDepth_ != null) {
size += _single_maxDepth_codec.CalculateSizeWithTag(MaxDepth);
}
size += categoryIds_.CalculateSize(_repeated_categoryIds_codec);
size += templateChainContains_.CalculateSize(_repeated_templateChainContains_codec);
if (TagNameGlob.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(TagNameGlob);
}
if (HasIncludeAttributes) {
size += 1 + 1;
}
if (AlarmBearingOnly != false) {
size += 1 + 1;
}
if (HistorizedOnly != false) {
size += 1 + 1;
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
@@ -963,6 +1388,43 @@ namespace MxGateway.Contracts.Proto.Galaxy {
if (other == null) {
return;
}
if (other.PageSize != 0) {
PageSize = other.PageSize;
}
if (other.PageToken.Length != 0) {
PageToken = other.PageToken;
}
if (other.maxDepth_ != null) {
if (maxDepth_ == null || other.MaxDepth != 0) {
MaxDepth = other.MaxDepth;
}
}
categoryIds_.Add(other.categoryIds_);
templateChainContains_.Add(other.templateChainContains_);
if (other.TagNameGlob.Length != 0) {
TagNameGlob = other.TagNameGlob;
}
if (other.HasIncludeAttributes) {
IncludeAttributes = other.IncludeAttributes;
}
if (other.AlarmBearingOnly != false) {
AlarmBearingOnly = other.AlarmBearingOnly;
}
if (other.HistorizedOnly != false) {
HistorizedOnly = other.HistorizedOnly;
}
switch (other.RootCase) {
case RootOneofCase.RootGobjectId:
RootGobjectId = other.RootGobjectId;
break;
case RootOneofCase.RootTagName:
RootTagName = other.RootTagName;
break;
case RootOneofCase.RootContainedPath:
RootContainedPath = other.RootContainedPath;
break;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
@@ -982,6 +1444,58 @@ namespace MxGateway.Contracts.Proto.Galaxy {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
case 8: {
PageSize = input.ReadInt32();
break;
}
case 18: {
PageToken = input.ReadString();
break;
}
case 24: {
RootGobjectId = input.ReadInt32();
break;
}
case 34: {
RootTagName = input.ReadString();
break;
}
case 42: {
RootContainedPath = input.ReadString();
break;
}
case 50: {
int? value = _single_maxDepth_codec.Read(input);
if (maxDepth_ == null || value != 0) {
MaxDepth = value;
}
break;
}
case 58:
case 56: {
categoryIds_.AddEntriesFrom(input, _repeated_categoryIds_codec);
break;
}
case 66: {
templateChainContains_.AddEntriesFrom(input, _repeated_templateChainContains_codec);
break;
}
case 74: {
TagNameGlob = input.ReadString();
break;
}
case 80: {
IncludeAttributes = input.ReadBool();
break;
}
case 88: {
AlarmBearingOnly = input.ReadBool();
break;
}
case 96: {
HistorizedOnly = input.ReadBool();
break;
}
}
}
#endif
@@ -1001,6 +1515,58 @@ namespace MxGateway.Contracts.Proto.Galaxy {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
case 8: {
PageSize = input.ReadInt32();
break;
}
case 18: {
PageToken = input.ReadString();
break;
}
case 24: {
RootGobjectId = input.ReadInt32();
break;
}
case 34: {
RootTagName = input.ReadString();
break;
}
case 42: {
RootContainedPath = input.ReadString();
break;
}
case 50: {
int? value = _single_maxDepth_codec.Read(ref input);
if (maxDepth_ == null || value != 0) {
MaxDepth = value;
}
break;
}
case 58:
case 56: {
categoryIds_.AddEntriesFrom(ref input, _repeated_categoryIds_codec);
break;
}
case 66: {
templateChainContains_.AddEntriesFrom(ref input, _repeated_templateChainContains_codec);
break;
}
case 74: {
TagNameGlob = input.ReadString();
break;
}
case 80: {
IncludeAttributes = input.ReadBool();
break;
}
case 88: {
AlarmBearingOnly = input.ReadBool();
break;
}
case 96: {
HistorizedOnly = input.ReadBool();
break;
}
}
}
}
@@ -1044,6 +1610,8 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public DiscoverHierarchyReply(DiscoverHierarchyReply other) : this() {
objects_ = other.objects_.Clone();
nextPageToken_ = other.nextPageToken_;
totalObjectCount_ = other.totalObjectCount_;
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
@@ -1064,6 +1632,36 @@ namespace MxGateway.Contracts.Proto.Galaxy {
get { return objects_; }
}
/// <summary>Field number for the "next_page_token" field.</summary>
public const int NextPageTokenFieldNumber = 2;
private string nextPageToken_ = "";
/// <summary>
/// Non-empty when another page is available.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string NextPageToken {
get { return nextPageToken_; }
set {
nextPageToken_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
/// <summary>Field number for the "total_object_count" field.</summary>
public const int TotalObjectCountFieldNumber = 3;
private int totalObjectCount_;
/// <summary>
/// Total number of objects in the cached hierarchy at the time of the call.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int TotalObjectCount {
get { return totalObjectCount_; }
set {
totalObjectCount_ = value;
}
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override bool Equals(object other) {
@@ -1080,6 +1678,8 @@ namespace MxGateway.Contracts.Proto.Galaxy {
return true;
}
if(!objects_.Equals(other.objects_)) return false;
if (NextPageToken != other.NextPageToken) return false;
if (TotalObjectCount != other.TotalObjectCount) return false;
return Equals(_unknownFields, other._unknownFields);
}
@@ -1088,6 +1688,8 @@ namespace MxGateway.Contracts.Proto.Galaxy {
public override int GetHashCode() {
int hash = 1;
hash ^= objects_.GetHashCode();
if (NextPageToken.Length != 0) hash ^= NextPageToken.GetHashCode();
if (TotalObjectCount != 0) hash ^= TotalObjectCount.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
@@ -1107,6 +1709,14 @@ namespace MxGateway.Contracts.Proto.Galaxy {
output.WriteRawMessage(this);
#else
objects_.WriteTo(output, _repeated_objects_codec);
if (NextPageToken.Length != 0) {
output.WriteRawTag(18);
output.WriteString(NextPageToken);
}
if (TotalObjectCount != 0) {
output.WriteRawTag(24);
output.WriteInt32(TotalObjectCount);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
@@ -1118,6 +1728,14 @@ namespace MxGateway.Contracts.Proto.Galaxy {
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
objects_.WriteTo(ref output, _repeated_objects_codec);
if (NextPageToken.Length != 0) {
output.WriteRawTag(18);
output.WriteString(NextPageToken);
}
if (TotalObjectCount != 0) {
output.WriteRawTag(24);
output.WriteInt32(TotalObjectCount);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
@@ -1129,6 +1747,12 @@ namespace MxGateway.Contracts.Proto.Galaxy {
public int CalculateSize() {
int size = 0;
size += objects_.CalculateSize(_repeated_objects_codec);
if (NextPageToken.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(NextPageToken);
}
if (TotalObjectCount != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(TotalObjectCount);
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
@@ -1142,6 +1766,12 @@ namespace MxGateway.Contracts.Proto.Galaxy {
return;
}
objects_.Add(other.objects_);
if (other.NextPageToken.Length != 0) {
NextPageToken = other.NextPageToken;
}
if (other.TotalObjectCount != 0) {
TotalObjectCount = other.TotalObjectCount;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
@@ -1165,6 +1795,14 @@ namespace MxGateway.Contracts.Proto.Galaxy {
objects_.AddEntriesFrom(input, _repeated_objects_codec);
break;
}
case 18: {
NextPageToken = input.ReadString();
break;
}
case 24: {
TotalObjectCount = input.ReadInt32();
break;
}
}
}
#endif
@@ -1188,6 +1826,14 @@ namespace MxGateway.Contracts.Proto.Galaxy {
objects_.AddEntriesFrom(ref input, _repeated_objects_codec);
break;
}
case 18: {
NextPageToken = input.ReadString();
break;
}
case 24: {
TotalObjectCount = input.ReadInt32();
break;
}
}
}
}
@@ -5,6 +5,7 @@ package galaxy_repository.v1;
option csharp_namespace = "MxGateway.Contracts.Proto.Galaxy";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
// Read-only browse over the AVEVA System Platform Galaxy Repository (ZB SQL
// database). Lets clients enumerate the deployed object hierarchy and each
@@ -37,10 +38,42 @@ message GetLastDeployTimeReply {
google.protobuf.Timestamp time_of_last_deploy = 2;
}
message DiscoverHierarchyRequest {}
message DiscoverHierarchyRequest {
// Maximum number of objects to return. The server applies its default when
// unset and rejects non-positive values.
int32 page_size = 1;
// Opaque token returned by a previous DiscoverHierarchy response.
string page_token = 2;
// Optional. When set, return only this object and its descendants.
// Empty = full hierarchy.
oneof root {
int32 root_gobject_id = 3;
string root_tag_name = 4;
string root_contained_path = 5;
}
// Optional. Cap on descendant depth from root. Zero returns only the root.
// Unset means unlimited depth.
google.protobuf.Int32Value max_depth = 6;
// Optional object category id filters.
repeated int32 category_ids = 7;
// Optional case-insensitive substring filters against template names.
repeated string template_chain_contains = 8;
// Optional anchored, case-insensitive glob over object tag_name.
string tag_name_glob = 9;
// Optional. Unset or true includes attributes. False returns object skeletons.
optional bool include_attributes = 10;
// Optional. Return only objects with at least one alarm-bearing attribute.
bool alarm_bearing_only = 11;
// Optional. Return only objects with at least one historized attribute.
bool historized_only = 12;
}
message DiscoverHierarchyReply {
repeated GalaxyObject objects = 1;
// Non-empty when another page is available.
string next_page_token = 2;
// Total number of objects in the cached hierarchy at the time of the call.
int32 total_object_count = 3;
}
message WatchDeployEventsRequest {
@@ -9,6 +9,7 @@ using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
@@ -248,6 +249,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
Service = new MxAccessGatewayService(
sessionManager,
new GatewayRequestIdentityAccessor(),
new AllowAllConstraintEnforcer(),
new MxAccessGrpcRequestValidator(),
mapper,
eventStreamService,
@@ -515,4 +517,33 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
}
}
}
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
}
@@ -1,3 +1,5 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveProtocolConfiguration(uint WorkerProtocolVersion);
public sealed record EffectiveProtocolConfiguration(
uint WorkerProtocolVersion,
int MaxGrpcMessageBytes);
@@ -3,4 +3,7 @@ namespace MxGateway.Server.Configuration;
public sealed record EffectiveSessionConfiguration(
int DefaultCommandTimeoutSeconds,
int MaxSessions,
int MaxPendingCommandsPerSession,
int DefaultLeaseSeconds,
int LeaseSweepIntervalSeconds,
bool AllowMultipleEventSubscribers);
@@ -28,6 +28,9 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
Sessions: new EffectiveSessionConfiguration(
DefaultCommandTimeoutSeconds: value.Sessions.DefaultCommandTimeoutSeconds,
MaxSessions: value.Sessions.MaxSessions,
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
@@ -41,6 +44,8 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
RecentFaultLimit: value.Dashboard.RecentFaultLimit,
RecentSessionLimit: value.Dashboard.RecentSessionLimit,
ShowTagValues: value.Dashboard.ShowTagValues),
Protocol: new EffectiveProtocolConfiguration(value.Protocol.WorkerProtocolVersion));
Protocol: new EffectiveProtocolConfiguration(
value.Protocol.WorkerProtocolVersion,
value.Protocol.MaxGrpcMessageBytes));
}
}
@@ -129,6 +129,14 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
options.MaxPendingCommandsPerSession,
"MxGateway:Sessions:MaxPendingCommandsPerSession must be greater than zero.",
failures);
AddIfNotPositive(
options.DefaultLeaseSeconds,
"MxGateway:Sessions:DefaultLeaseSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.LeaseSweepIntervalSeconds,
"MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.",
failures);
if (options.AllowMultipleEventSubscribers)
{
@@ -179,6 +187,12 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
failures.Add(
$"MxGateway:Protocol:WorkerProtocolVersion must be {GatewayContractInfo.WorkerProtocolVersion}.");
}
if (options.MaxGrpcMessageBytes is < MinimumMaxMessageBytes or > MaximumMaxMessageBytes)
{
failures.Add(
$"MxGateway:Protocol:MaxGrpcMessageBytes must be between {MinimumMaxMessageBytes} and {MaximumMaxMessageBytes}.");
}
}
private static void AddIfBlank(string? value, string message, List<string> failures)
@@ -5,4 +5,6 @@ namespace MxGateway.Server.Configuration;
public sealed class ProtocolOptions
{
public uint WorkerProtocolVersion { get; init; } = GatewayContractInfo.WorkerProtocolVersion;
public int MaxGrpcMessageBytes { get; init; } = 16 * 1024 * 1024;
}
@@ -8,5 +8,9 @@ public sealed class SessionOptions
public int MaxPendingCommandsPerSession { get; init; } = 128;
public int DefaultLeaseSeconds { get; init; } = 1800;
public int LeaseSweepIntervalSeconds { get; init; } = 30;
public bool AllowMultipleEventSubscribers { get; init; }
}
@@ -26,6 +26,9 @@
<li class="nav-item">
<NavLink class="nav-link" href="galaxy">Galaxy</NavLink>
</li>
<li class="nav-item">
<NavLink class="nav-link" href="apikeys">API Keys</NavLink>
</li>
<li class="nav-item">
<NavLink class="nav-link" href="settings">Settings</NavLink>
</li>
@@ -0,0 +1,99 @@
@page "/apikeys"
@page "/dashboard/apikeys"
@inherits DashboardPageBase
<PageTitle>Dashboard API Keys</PageTitle>
@if (Snapshot is null)
{
<div class="empty-state">Loading API keys.</div>
}
else
{
<div class="dashboard-page-header">
<div>
<h1>API Keys</h1>
<div class="text-secondary">@Snapshot.ApiKeys.Count key rows</div>
</div>
</div>
<section class="dashboard-section">
@if (Snapshot.ApiKeys.Count == 0)
{
<div class="empty-state">No API keys are available for display.</div>
}
else
{
<div class="table-responsive">
<table class="table table-sm align-middle dashboard-table">
<thead>
<tr>
<th scope="col">Key</th>
<th scope="col">Status</th>
<th scope="col">Display Name</th>
<th scope="col">Scopes</th>
<th scope="col">Constraints</th>
<th scope="col">Created</th>
<th scope="col">Last Used</th>
</tr>
</thead>
<tbody>
@foreach (DashboardApiKeySummary key in Snapshot.ApiKeys)
{
<tr>
<td><code>@key.KeyId</code></td>
<td><StatusBadge Text="@(key.RevokedUtc is null ? "Active" : "Revoked")" /></td>
<td>@DashboardDisplay.Text(key.DisplayName)</td>
<td>@DashboardDisplay.Text(string.Join(", ", key.Scopes.Order(StringComparer.Ordinal)))</td>
<td>@DashboardDisplay.Text(ConstraintText(key.Constraints))</td>
<td>@DashboardDisplay.DateTime(key.CreatedUtc)</td>
<td>@DashboardDisplay.DateTime(key.LastUsedUtc)</td>
</tr>
}
</tbody>
</table>
</div>
}
</section>
}
@code {
private static string ConstraintText(MxGateway.Server.Security.Authentication.ApiKeyConstraints constraints)
{
if (constraints.IsEmpty)
{
return "unconstrained";
}
List<string> parts = [];
AddList(parts, "read_subtrees", constraints.ReadSubtrees);
AddList(parts, "write_subtrees", constraints.WriteSubtrees);
AddList(parts, "read_tag_globs", constraints.ReadTagGlobs);
AddList(parts, "write_tag_globs", constraints.WriteTagGlobs);
AddList(parts, "browse_subtrees", constraints.BrowseSubtrees);
if (constraints.MaxWriteClassification is { } max)
{
parts.Add($"max_write_classification={max}");
}
if (constraints.ReadAlarmOnly)
{
parts.Add("read_alarm_only");
}
if (constraints.ReadHistorizedOnly)
{
parts.Add("read_historized_only");
}
return string.Join("; ", parts);
}
private static void AddList(List<string> parts, string name, IReadOnlyList<string> values)
{
if (values.Count > 0)
{
parts.Add($"{name}=[{string.Join(", ", values)}]");
}
}
}
@@ -190,6 +190,8 @@ else
private int CommandTimeoutSeconds() => GalaxyOptions.Value.CommandTimeoutSeconds;
private string? GalaxyConnectionStringDisplay() =>
DashboardRedactor.Redact(GalaxyOptions.Value.ConnectionString);
private string GalaxyConnectionStringDisplay()
{
return DashboardConnectionStringDisplay.GalaxyRepositoryConnectionString(GalaxyOptions.Value.ConnectionString);
}
}
@@ -0,0 +1,12 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardApiKeySummary(
string KeyId,
string DisplayName,
IReadOnlySet<string> Scopes,
ApiKeyConstraints Constraints,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,28 @@
using Microsoft.Data.SqlClient;
namespace MxGateway.Server.Dashboard;
public static class DashboardConnectionStringDisplay
{
public static string GalaxyRepositoryConnectionString(string connectionString)
{
try
{
SqlConnectionStringBuilder builder = new(connectionString);
SqlConnectionStringBuilder display = new()
{
DataSource = builder.DataSource,
InitialCatalog = builder.InitialCatalog,
IntegratedSecurity = builder.IntegratedSecurity,
Encrypt = builder.Encrypt,
TrustServerCertificate = builder.TrustServerCertificate,
};
return display.ConnectionString;
}
catch (ArgumentException)
{
return "[invalid connection string]";
}
}
}
@@ -2,97 +2,11 @@ using MxGateway.Server.Galaxy;
namespace MxGateway.Server.Dashboard;
/// <summary>
/// Projects a <see cref="GalaxyHierarchyCacheEntry"/> into a
/// <see cref="DashboardGalaxySummary"/> for the Blazor pages. Top-templates and
/// per-category breakdowns are computed here rather than stored on the cache so the
/// Galaxy namespace stays free of dashboard-presentation concepts.
/// </summary>
/// <summary>Projects the precomputed Galaxy cache dashboard summary.</summary>
internal static class DashboardGalaxyProjector
{
private const int TopTemplatesLimit = 10;
private static readonly IReadOnlyDictionary<int, string> CategoryNamesById = new Dictionary<int, string>
{
[1] = "WinPlatform",
[3] = "AppEngine",
[4] = "InTouchViewApp",
[10] = "UserDefined",
[11] = "FieldReference",
[13] = "Area",
[17] = "DIObject",
[24] = "DDESuiteLinkClient",
[26] = "OPCClient",
};
public static DashboardGalaxySummary Project(GalaxyHierarchyCacheEntry entry)
{
DashboardGalaxyStatus status = entry.Status switch
{
GalaxyCacheStatus.Healthy => DashboardGalaxyStatus.Healthy,
GalaxyCacheStatus.Stale => DashboardGalaxyStatus.Stale,
GalaxyCacheStatus.Unavailable => DashboardGalaxyStatus.Unavailable,
_ => DashboardGalaxyStatus.Unknown,
};
IReadOnlyList<DashboardGalaxyTemplateUsage> topTemplates;
IReadOnlyList<DashboardGalaxyCategoryCount> objectCategories;
if (entry.Hierarchy.Count == 0)
{
topTemplates = Array.Empty<DashboardGalaxyTemplateUsage>();
objectCategories = Array.Empty<DashboardGalaxyCategoryCount>();
}
else
{
Dictionary<int, int> objectsByCategory = new();
Dictionary<string, int> templateUsage = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyHierarchyRow row in entry.Hierarchy)
{
objectsByCategory.TryGetValue(row.CategoryId, out int categoryCount);
objectsByCategory[row.CategoryId] = categoryCount + 1;
if (row.TemplateChain.Count > 0)
{
string immediate = row.TemplateChain[0];
if (!string.IsNullOrWhiteSpace(immediate))
{
templateUsage.TryGetValue(immediate, out int templateCount);
templateUsage[immediate] = templateCount + 1;
}
}
}
topTemplates = templateUsage
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Take(TopTemplatesLimit)
.Select(entry => new DashboardGalaxyTemplateUsage(entry.Key, entry.Value))
.ToArray();
objectCategories = objectsByCategory
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key)
.Select(entry => new DashboardGalaxyCategoryCount(
entry.Key,
CategoryNamesById.TryGetValue(entry.Key, out string? name) ? name : $"Category {entry.Key}",
entry.Value))
.ToArray();
}
return new DashboardGalaxySummary(
Status: status,
LastQueriedAt: entry.LastQueriedAt,
LastSuccessAt: entry.LastSuccessAt,
LastDeployTime: entry.LastDeployTime,
LastError: entry.LastError,
ObjectCount: entry.ObjectCount,
AreaCount: entry.AreaCount,
AttributeCount: entry.AttributeCount,
HistorizedAttributeCount: entry.HistorizedAttributeCount,
AlarmAttributeCount: entry.AlarmAttributeCount,
TopTemplates: topTemplates,
ObjectCategories: objectCategories);
return entry.DashboardSummary;
}
}
@@ -12,5 +12,6 @@ public sealed record DashboardSnapshot(
IReadOnlyList<DashboardWorkerSummary> Workers,
IReadOnlyList<DashboardMetricSummary> Metrics,
IReadOnlyList<DashboardFaultSummary> Faults,
IReadOnlyList<DashboardApiKeySummary> ApiKeys,
EffectiveGatewayConfiguration Configuration,
DashboardGalaxySummary Galaxy);
@@ -1,8 +1,11 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Galaxy;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
@@ -16,24 +19,32 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
private readonly GatewayMetrics _metrics;
private readonly IGatewayConfigurationProvider _configurationProvider;
private readonly IGalaxyHierarchyCache _galaxyHierarchyCache;
private readonly IApiKeyAdminStore _apiKeyAdminStore;
private readonly TimeProvider _timeProvider;
private readonly DateTimeOffset _gatewayStartedAt;
private readonly TimeSpan _snapshotInterval;
private readonly TimeSpan _apiKeySummaryRefreshTimeout = TimeSpan.FromSeconds(2);
private readonly int _recentFaultLimit;
private readonly int _recentSessionLimit;
private readonly ILogger<DashboardSnapshotService> _logger;
private readonly SemaphoreSlim _apiKeySummaryRefreshGate = new(1, 1);
private IReadOnlyList<DashboardApiKeySummary> _apiKeySummaries = Array.Empty<DashboardApiKeySummary>();
public DashboardSnapshotService(
ISessionRegistry sessionRegistry,
GatewayMetrics metrics,
IGatewayConfigurationProvider configurationProvider,
IGalaxyHierarchyCache galaxyHierarchyCache,
IApiKeyAdminStore apiKeyAdminStore,
IOptions<GatewayOptions> options,
TimeProvider? timeProvider = null)
TimeProvider? timeProvider = null,
ILogger<DashboardSnapshotService>? logger = null)
{
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
_galaxyHierarchyCache = galaxyHierarchyCache ?? throw new ArgumentNullException(nameof(galaxyHierarchyCache));
_apiKeyAdminStore = apiKeyAdminStore ?? throw new ArgumentNullException(nameof(apiKeyAdminStore));
ArgumentNullException.ThrowIfNull(options);
_timeProvider = timeProvider ?? TimeProvider.System;
@@ -41,6 +52,7 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
_logger = logger ?? NullLogger<DashboardSnapshotService>.Instance;
}
public DashboardSnapshot GetSnapshot()
@@ -69,6 +81,7 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
Workers: workerSummaries,
Metrics: CreateMetricSummaries(metricsSnapshot),
Faults: CreateFaultSummaries(sessions, generatedAt),
ApiKeys: Volatile.Read(ref _apiKeySummaries),
Configuration: _configurationProvider.GetEffectiveConfiguration(),
Galaxy: DashboardGalaxyProjector.Project(_galaxyHierarchyCache.Current));
}
@@ -81,6 +94,7 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
yield break;
}
await RefreshApiKeySummariesAsync(cancellationToken).ConfigureAwait(false);
yield return GetSnapshot();
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
@@ -101,6 +115,7 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
yield break;
}
await RefreshApiKeySummariesAsync(cancellationToken).ConfigureAwait(false);
yield return GetSnapshot();
}
}
@@ -192,6 +207,51 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService
.ToArray();
}
private async Task RefreshApiKeySummariesAsync(CancellationToken cancellationToken)
{
if (!await _apiKeySummaryRefreshGate.WaitAsync(0, cancellationToken).ConfigureAwait(false))
{
return;
}
try
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(_apiKeySummaryRefreshTimeout);
IReadOnlyList<DashboardApiKeySummary> summaries = (await _apiKeyAdminStore.ListAsync(timeout.Token)
.ConfigureAwait(false))
.Select(key => new DashboardApiKeySummary(
KeyId: key.KeyId,
DisplayName: key.DisplayName,
Scopes: key.Scopes,
Constraints: key.Constraints,
CreatedUtc: key.CreatedUtc,
LastUsedUtc: key.LastUsedUtc,
RevokedUtc: key.RevokedUtc))
.ToArray();
Volatile.Write(ref _apiKeySummaries, summaries);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (OperationCanceledException)
{
_logger.LogWarning(
"Timed out refreshing dashboard API key summaries after {Timeout}.",
_apiKeySummaryRefreshTimeout);
}
catch (Exception)
{
_logger.LogWarning("Failed to refresh dashboard API key summaries.");
}
finally
{
_apiKeySummaryRefreshGate.Release();
}
}
private static bool HasFault(GatewaySession session)
{
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
@@ -0,0 +1,44 @@
using System.Text;
using System.Text.RegularExpressions;
namespace MxGateway.Server.Galaxy;
public static class GalaxyGlobMatcher
{
public static bool IsMatch(string value, string glob)
{
if (string.IsNullOrWhiteSpace(glob))
{
return true;
}
return Regex.IsMatch(
value ?? string.Empty,
BuildRegex(glob),
RegexOptions.CultureInvariant | RegexOptions.IgnoreCase,
TimeSpan.FromMilliseconds(100));
}
private static string BuildRegex(string glob)
{
StringBuilder builder = new("^", glob.Length + 2);
foreach (char character in glob)
{
switch (character)
{
case '*':
builder.Append(".*");
break;
case '?':
builder.Append('.');
break;
default:
builder.Append(Regex.Escape(character.ToString()));
break;
}
}
builder.Append('$');
return builder.ToString();
}
}
@@ -2,6 +2,7 @@ using Google.Protobuf.WellKnownTypes;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Grpc;
namespace MxGateway.Server.Galaxy;
@@ -43,7 +44,16 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
{
GalaxyHierarchyCacheEntry snapshot = Volatile.Read(ref _current);
GalaxyCacheStatus projected = ProjectStatus(snapshot);
return projected == snapshot.Status ? snapshot : snapshot with { Status = projected };
return projected == snapshot.Status
? snapshot
: snapshot with
{
Status = projected,
DashboardSummary = snapshot.DashboardSummary with
{
Status = MapDashboardStatus(projected),
},
};
}
}
@@ -89,6 +99,14 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
LastQueriedAt = queriedAt,
LastSuccessAt = queriedAt,
LastError = null,
DashboardSummary = previous.DashboardSummary with
{
Status = DashboardGalaxyStatus.Healthy,
LastQueriedAt = queriedAt,
LastSuccessAt = queriedAt,
LastDeployTime = deployTime,
LastError = null,
},
};
Volatile.Write(ref _current, refreshed);
_firstLoad.TrySetResult();
@@ -101,11 +119,24 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
List<GalaxyHierarchyRow> hierarchy = hierarchyTask.Result;
List<GalaxyAttributeRow> attributes = attributesTask.Result;
DiscoverHierarchyReply reply = BuildReply(hierarchy, attributes);
IReadOnlyList<GalaxyObject> objects = BuildObjects(hierarchy, attributes);
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build(objects);
int areaCount = hierarchy.Count(row => row.IsArea);
int historized = attributes.Count(row => row.IsHistorized);
int alarms = attributes.Count(row => row.IsAlarm);
DashboardGalaxySummary dashboardSummary = BuildDashboardSummary(
status: GalaxyCacheStatus.Healthy,
lastQueriedAt: queriedAt,
lastSuccessAt: queriedAt,
lastDeployTime: deployTime,
lastError: null,
hierarchy: hierarchy,
objectCount: hierarchy.Count,
areaCount: areaCount,
attributeCount: attributes.Count,
historizedAttributeCount: historized,
alarmAttributeCount: alarms);
long nextSequence = previous.Sequence + 1;
GalaxyHierarchyCacheEntry next = new(
@@ -115,9 +146,9 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
LastSuccessAt: queriedAt,
LastDeployTime: deployTime,
LastError: null,
Hierarchy: hierarchy,
Attributes: attributes,
Reply: reply,
Objects: objects,
Index: index,
DashboardSummary: dashboardSummary,
ObjectCount: hierarchy.Count,
AreaCount: areaCount,
AttributeCount: attributes.Count,
@@ -146,13 +177,19 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
Status = previous.HasData ? GalaxyCacheStatus.Stale : GalaxyCacheStatus.Unavailable,
LastQueriedAt = queriedAt,
LastError = exception.Message,
DashboardSummary = previous.DashboardSummary with
{
Status = MapDashboardStatus(previous.HasData ? GalaxyCacheStatus.Stale : GalaxyCacheStatus.Unavailable),
LastQueriedAt = queriedAt,
LastError = exception.Message,
},
};
Volatile.Write(ref _current, failed);
_firstLoad.TrySetResult();
}
}
private static DiscoverHierarchyReply BuildReply(
private static IReadOnlyList<GalaxyObject> BuildObjects(
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
IReadOnlyList<GalaxyAttributeRow> attributes)
{
@@ -160,14 +197,110 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.ToList());
DiscoverHierarchyReply reply = new();
List<GalaxyObject> objects = new(hierarchy.Count);
foreach (GalaxyHierarchyRow row in hierarchy)
{
reply.Objects.Add(GalaxyProtoMapper.MapObject(row, attributesByGobjectId));
objects.Add(GalaxyProtoMapper.MapObject(row, attributesByGobjectId));
}
return reply;
return objects;
}
private static DashboardGalaxySummary BuildDashboardSummary(
GalaxyCacheStatus status,
DateTimeOffset? lastQueriedAt,
DateTimeOffset? lastSuccessAt,
DateTimeOffset? lastDeployTime,
string? lastError,
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
int objectCount,
int areaCount,
int attributeCount,
int historizedAttributeCount,
int alarmAttributeCount)
{
IReadOnlyList<DashboardGalaxyTemplateUsage> topTemplates;
IReadOnlyList<DashboardGalaxyCategoryCount> objectCategories;
if (hierarchy.Count == 0)
{
topTemplates = Array.Empty<DashboardGalaxyTemplateUsage>();
objectCategories = Array.Empty<DashboardGalaxyCategoryCount>();
}
else
{
Dictionary<int, int> objectsByCategory = new();
Dictionary<string, int> templateUsage = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyHierarchyRow row in hierarchy)
{
objectsByCategory.TryGetValue(row.CategoryId, out int categoryCount);
objectsByCategory[row.CategoryId] = categoryCount + 1;
if (row.TemplateChain.Count > 0)
{
string immediate = row.TemplateChain[0];
if (!string.IsNullOrWhiteSpace(immediate))
{
templateUsage.TryGetValue(immediate, out int templateCount);
templateUsage[immediate] = templateCount + 1;
}
}
}
topTemplates = templateUsage
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Take(10)
.Select(entry => new DashboardGalaxyTemplateUsage(entry.Key, entry.Value))
.ToArray();
objectCategories = objectsByCategory
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key)
.Select(entry => new DashboardGalaxyCategoryCount(
entry.Key,
ResolveCategoryName(entry.Key),
entry.Value))
.ToArray();
}
return new DashboardGalaxySummary(
Status: MapDashboardStatus(status),
LastQueriedAt: lastQueriedAt,
LastSuccessAt: lastSuccessAt,
LastDeployTime: lastDeployTime,
LastError: lastError,
ObjectCount: objectCount,
AreaCount: areaCount,
AttributeCount: attributeCount,
HistorizedAttributeCount: historizedAttributeCount,
AlarmAttributeCount: alarmAttributeCount,
TopTemplates: topTemplates,
ObjectCategories: objectCategories);
}
private static DashboardGalaxyStatus MapDashboardStatus(GalaxyCacheStatus status) => status switch
{
GalaxyCacheStatus.Healthy => DashboardGalaxyStatus.Healthy,
GalaxyCacheStatus.Stale => DashboardGalaxyStatus.Stale,
GalaxyCacheStatus.Unavailable => DashboardGalaxyStatus.Unavailable,
_ => DashboardGalaxyStatus.Unknown,
};
private static string ResolveCategoryName(int categoryId) => categoryId switch
{
1 => "WinPlatform",
3 => "AppEngine",
4 => "InTouchViewApp",
10 => "UserDefined",
11 => "FieldReference",
13 => "Area",
17 => "DIObject",
24 => "DDESuiteLinkClient",
26 => "OPCClient",
_ => $"Category {categoryId}",
};
private GalaxyCacheStatus ProjectStatus(GalaxyHierarchyCacheEntry snapshot)
{
if (snapshot.Status is GalaxyCacheStatus.Unknown or GalaxyCacheStatus.Unavailable)
@@ -1,11 +1,12 @@
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Server.Dashboard;
namespace MxGateway.Server.Galaxy;
/// <summary>
/// Immutable snapshot of the Galaxy Repository browse data held by
/// <see cref="GalaxyHierarchyCache"/>. Multiple gRPC clients share the same instance —
/// the materialized <see cref="Reply"/> is produced once per refresh and reused.
/// <see cref="GalaxyHierarchyCache"/>. Multiple gRPC clients share the same
/// materialized object list and precomputed dashboard projection.
/// </summary>
public sealed record GalaxyHierarchyCacheEntry(
GalaxyCacheStatus Status,
@@ -14,9 +15,9 @@ public sealed record GalaxyHierarchyCacheEntry(
DateTimeOffset? LastSuccessAt,
DateTimeOffset? LastDeployTime,
string? LastError,
IReadOnlyList<GalaxyHierarchyRow> Hierarchy,
IReadOnlyList<GalaxyAttributeRow> Attributes,
DiscoverHierarchyReply? Reply,
IReadOnlyList<GalaxyObject> Objects,
GalaxyHierarchyIndex Index,
DashboardGalaxySummary DashboardSummary,
int ObjectCount,
int AreaCount,
int AttributeCount,
@@ -30,9 +31,9 @@ public sealed record GalaxyHierarchyCacheEntry(
LastSuccessAt: null,
LastDeployTime: null,
LastError: null,
Hierarchy: Array.Empty<GalaxyHierarchyRow>(),
Attributes: Array.Empty<GalaxyAttributeRow>(),
Reply: null,
Objects: Array.Empty<GalaxyObject>(),
Index: GalaxyHierarchyIndex.Empty,
DashboardSummary: DashboardGalaxySummary.Unknown,
ObjectCount: 0,
AreaCount: 0,
AttributeCount: 0,
@@ -0,0 +1,106 @@
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Server.Galaxy;
public sealed class GalaxyHierarchyIndex
{
private GalaxyHierarchyIndex(
IReadOnlyList<GalaxyObjectView> objectViews,
IReadOnlyDictionary<int, GalaxyObjectView> objectViewsById,
IReadOnlyDictionary<string, GalaxyTagLookup> tagsByAddress)
{
ObjectViews = objectViews;
ObjectViewsById = objectViewsById;
TagsByAddress = tagsByAddress;
}
public static GalaxyHierarchyIndex Empty { get; } = new(
Array.Empty<GalaxyObjectView>(),
new Dictionary<int, GalaxyObjectView>(),
new Dictionary<string, GalaxyTagLookup>(StringComparer.OrdinalIgnoreCase));
public IReadOnlyList<GalaxyObjectView> ObjectViews { get; }
public IReadOnlyDictionary<int, GalaxyObjectView> ObjectViewsById { get; }
public IReadOnlyDictionary<string, GalaxyTagLookup> TagsByAddress { get; }
public static GalaxyHierarchyIndex Build(IReadOnlyList<GalaxyObject> objects)
{
if (objects.Count == 0)
{
return Empty;
}
Dictionary<int, GalaxyObject> objectsById = new();
foreach (GalaxyObject obj in objects)
{
objectsById.TryAdd(obj.GobjectId, obj);
}
List<GalaxyObjectView> views = new(objects.Count);
Dictionary<int, GalaxyObjectView> viewsById = new();
Dictionary<string, GalaxyTagLookup> tagsByAddress = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyObject obj in objects)
{
string path = BuildContainedPath(obj, objectsById);
int depth = string.IsNullOrWhiteSpace(path) ? 0 : path.Count(character => character == '/');
GalaxyObjectView view = new(obj, path, depth);
views.Add(view);
viewsById.TryAdd(obj.GobjectId, view);
if (!string.IsNullOrWhiteSpace(obj.TagName))
{
tagsByAddress.TryAdd(obj.TagName, new GalaxyTagLookup(obj, Attribute: null, path));
}
foreach (GalaxyAttribute attribute in obj.Attributes)
{
if (!string.IsNullOrWhiteSpace(attribute.FullTagReference))
{
tagsByAddress.TryAdd(attribute.FullTagReference, new GalaxyTagLookup(obj, attribute, path));
}
}
}
return new GalaxyHierarchyIndex(
views,
viewsById,
tagsByAddress);
}
private static string BuildContainedPath(
GalaxyObject obj,
IReadOnlyDictionary<int, GalaxyObject> objectsById)
{
Stack<string> names = new();
HashSet<int> seen = [];
GalaxyObject? current = obj;
while (current is not null && seen.Add(current.GobjectId))
{
names.Push(ResolvePathSegment(current));
current = current.ParentGobjectId != 0
&& objectsById.TryGetValue(current.ParentGobjectId, out GalaxyObject? parent)
? parent
: null;
}
return string.Join('/', names.Where(name => !string.IsNullOrWhiteSpace(name)));
}
private static string ResolvePathSegment(GalaxyObject obj)
{
if (!string.IsNullOrWhiteSpace(obj.ContainedName))
{
return obj.ContainedName;
}
if (!string.IsNullOrWhiteSpace(obj.BrowseName))
{
return obj.BrowseName;
}
return obj.TagName;
}
}
@@ -0,0 +1,246 @@
using System.Security.Cryptography;
using System.Text;
using Grpc.Core;
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Server.Galaxy;
public static class GalaxyHierarchyProjector
{
public static GalaxyHierarchyQueryResult Project(
GalaxyHierarchyCacheEntry entry,
DiscoverHierarchyRequest request,
IReadOnlyList<string>? browseSubtreeGlobs = null)
{
return Project(
entry,
request,
browseSubtreeGlobs,
offset: 0,
pageSize: int.MaxValue);
}
public static GalaxyHierarchyQueryResult Project(
GalaxyHierarchyCacheEntry entry,
DiscoverHierarchyRequest request,
IReadOnlyList<string>? browseSubtreeGlobs,
int offset,
int pageSize)
{
ArgumentNullException.ThrowIfNull(entry);
ArgumentNullException.ThrowIfNull(request);
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset), offset, "Offset must be greater than or equal to zero.");
}
if (pageSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(pageSize), pageSize, "Page size must be greater than zero.");
}
IReadOnlyList<GalaxyObjectView> views = entry.Index.ObjectViews;
GalaxyObjectView? root = ResolveRoot(request, views);
int? maxDepth = request.MaxDepth;
if (maxDepth < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy max_depth must be greater than or equal to zero when provided."));
}
List<GalaxyObject> page = [];
int matchedCount = 0;
bool includeAttributes = IncludeAttributes(request);
foreach (GalaxyObjectView view in views)
{
if (!MatchesRoot(view, root, maxDepth)
|| !MatchesBrowseSubtrees(view, browseSubtreeGlobs)
|| !MatchesFilters(view.Object, request))
{
continue;
}
if (matchedCount >= offset && page.Count < pageSize)
{
page.Add(CloneObject(view.Object, includeAttributes));
}
matchedCount++;
}
return new GalaxyHierarchyQueryResult(
page,
matchedCount,
ComputeFilterSignature(request, browseSubtreeGlobs));
}
public static GalaxyObject? FindObjectForTag(
GalaxyHierarchyCacheEntry entry,
string tagAddress)
{
if (string.IsNullOrWhiteSpace(tagAddress))
{
return null;
}
return entry.Index.TagsByAddress.TryGetValue(tagAddress, out GalaxyTagLookup? lookup)
? lookup.Object
: null;
}
public static GalaxyAttribute? FindAttributeForTag(
GalaxyHierarchyCacheEntry entry,
string tagAddress)
{
if (string.IsNullOrWhiteSpace(tagAddress))
{
return null;
}
return entry.Index.TagsByAddress.TryGetValue(tagAddress, out GalaxyTagLookup? lookup)
? lookup.Attribute
: null;
}
public static string GetContainedPath(
GalaxyHierarchyCacheEntry entry,
int gobjectId)
{
return entry.Index.ObjectViewsById.TryGetValue(gobjectId, out GalaxyObjectView? view)
? view.ContainedPath
: string.Empty;
}
private static GalaxyObjectView? ResolveRoot(
DiscoverHierarchyRequest request,
IReadOnlyList<GalaxyObjectView> views)
{
GalaxyObjectView? root = request.RootCase switch
{
DiscoverHierarchyRequest.RootOneofCase.None => null,
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId => views.FirstOrDefault(
view => view.Object.GobjectId == request.RootGobjectId),
DiscoverHierarchyRequest.RootOneofCase.RootTagName => views.FirstOrDefault(
view => string.Equals(view.Object.TagName, request.RootTagName, StringComparison.OrdinalIgnoreCase)),
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath => views.FirstOrDefault(
view => string.Equals(view.ContainedPath, request.RootContainedPath, StringComparison.OrdinalIgnoreCase)),
_ => null,
};
if (request.RootCase != DiscoverHierarchyRequest.RootOneofCase.None && root is null)
{
throw new RpcException(new Status(StatusCode.NotFound, "DiscoverHierarchy root was not found."));
}
return root;
}
private static bool MatchesRoot(
GalaxyObjectView view,
GalaxyObjectView? root,
int? maxDepth)
{
if (root is null)
{
return true;
}
bool isRoot = view.Object.GobjectId == root.Object.GobjectId;
bool isDescendant = view.ContainedPath.StartsWith(root.ContainedPath + "/", StringComparison.OrdinalIgnoreCase);
if (!isRoot && !isDescendant)
{
return false;
}
return maxDepth is null || view.Depth - root.Depth <= maxDepth.Value;
}
private static bool MatchesBrowseSubtrees(
GalaxyObjectView view,
IReadOnlyList<string>? browseSubtreeGlobs)
{
return browseSubtreeGlobs is null
|| browseSubtreeGlobs.Count == 0
|| browseSubtreeGlobs.Any(glob => GalaxyGlobMatcher.IsMatch(view.ContainedPath, glob));
}
private static bool MatchesFilters(
GalaxyObject obj,
DiscoverHierarchyRequest request)
{
if (request.CategoryIds.Count > 0 && !request.CategoryIds.Contains(obj.CategoryId))
{
return false;
}
foreach (string templateFilter in request.TemplateChainContains)
{
if (!obj.TemplateChain.Any(template => template.Contains(templateFilter, StringComparison.OrdinalIgnoreCase)))
{
return false;
}
}
if (!string.IsNullOrWhiteSpace(request.TagNameGlob)
&& !GalaxyGlobMatcher.IsMatch(obj.TagName, request.TagNameGlob))
{
return false;
}
if (request.AlarmBearingOnly && !obj.Attributes.Any(attribute => attribute.IsAlarm))
{
return false;
}
if (request.HistorizedOnly && !obj.Attributes.Any(attribute => attribute.IsHistorized))
{
return false;
}
return true;
}
private static bool IncludeAttributes(DiscoverHierarchyRequest request)
{
return !request.HasIncludeAttributes || request.IncludeAttributes;
}
private static GalaxyObject CloneObject(GalaxyObject source, bool includeAttributes)
{
GalaxyObject clone = source.Clone();
if (!includeAttributes)
{
clone.Attributes.Clear();
}
return clone;
}
public static string ComputeFilterSignature(
DiscoverHierarchyRequest request,
IReadOnlyList<string>? browseSubtreeGlobs)
{
StringBuilder builder = new();
builder.Append("root=").Append(request.RootCase).Append('|');
builder.Append(request.RootCase switch
{
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId => request.RootGobjectId.ToString(
System.Globalization.CultureInfo.InvariantCulture),
DiscoverHierarchyRequest.RootOneofCase.RootTagName => request.RootTagName,
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath => request.RootContainedPath,
_ => string.Empty,
});
builder.Append("|max=").Append(request.MaxDepth?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? "");
builder.Append("|cat=").AppendJoin(',', request.CategoryIds.Order());
builder.Append("|tpl=").AppendJoin(',', request.TemplateChainContains.Order(StringComparer.OrdinalIgnoreCase));
builder.Append("|glob=").Append(request.TagNameGlob);
builder.Append("|attrs=").Append(request.HasIncludeAttributes ? request.IncludeAttributes.ToString() : "unset");
builder.Append("|alarm=").Append(request.AlarmBearingOnly);
builder.Append("|hist=").Append(request.HistorizedOnly);
builder.Append("|browse=").AppendJoin(',', (browseSubtreeGlobs ?? Array.Empty<string>()).Order(StringComparer.OrdinalIgnoreCase));
byte[] hash = SHA256.HashData(Encoding.UTF8.GetBytes(builder.ToString()));
return Convert.ToHexString(hash, 0, 12);
}
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Server.Galaxy;
public sealed record GalaxyHierarchyQueryResult(
IReadOnlyList<GalaxyObject> Objects,
int TotalObjectCount,
string FilterSignature);
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Server.Galaxy;
public sealed record GalaxyObjectView(
GalaxyObject Object,
string ContainedPath,
int Depth);
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Server.Galaxy;
public sealed record GalaxyTagLookup(
GalaxyObject Object,
GalaxyAttribute? Attribute,
string ContainedPath);
@@ -3,6 +3,8 @@ using Grpc.Core;
using Microsoft.Data.SqlClient;
using MxGateway.Contracts.Proto.Galaxy;
using GalaxyDb = MxGateway.Server.Galaxy;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using ProtoGalaxyRepository = MxGateway.Contracts.Proto.Galaxy.GalaxyRepository;
namespace MxGateway.Server.Grpc;
@@ -18,9 +20,12 @@ public sealed class GalaxyRepositoryGrpcService(
GalaxyDb.GalaxyRepository repository,
GalaxyDb.IGalaxyHierarchyCache cache,
GalaxyDb.IGalaxyDeployNotifier notifier,
IGatewayRequestIdentityAccessor identityAccessor,
ILogger<GalaxyRepositoryGrpcService> logger) : ProtoGalaxyRepository.GalaxyRepositoryBase
{
private static readonly TimeSpan FirstLoadWaitBudget = TimeSpan.FromSeconds(5);
private const int DefaultDiscoverPageSize = 1000;
private const int MaxDiscoverPageSize = 5000;
public override async Task<TestConnectionReply> TestConnection(
TestConnectionRequest request,
@@ -59,16 +64,44 @@ public sealed class GalaxyRepositoryGrpcService(
await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false);
GalaxyDb.GalaxyHierarchyCacheEntry entry = cache.Current;
if (!entry.HasData || entry.Reply is null)
if (!entry.HasData)
{
throw new RpcException(new Status(
StatusCode.Unavailable,
ResolveUnavailableMessage(entry)));
}
// Same materialized reply is shared across all clients — gRPC serialization is
// read-only and the entry is replaced atomically on the next refresh.
return entry.Reply;
int pageSize = ResolvePageSize(request.PageSize);
IReadOnlyList<string> browseSubtrees = ResolveBrowseSubtrees();
string filterSignature = GalaxyDb.GalaxyHierarchyProjector.ComputeFilterSignature(request, browseSubtrees);
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
GalaxyDb.GalaxyHierarchyQueryResult query = GalaxyDb.GalaxyHierarchyProjector.Project(
entry,
request,
browseSubtrees,
pageToken.Offset,
pageSize);
int offset = pageToken.Offset;
if (offset > query.TotalObjectCount)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is outside the current hierarchy."));
}
DiscoverHierarchyReply reply = new()
{
TotalObjectCount = query.TotalObjectCount,
};
reply.Objects.Add(query.Objects);
int nextOffset = offset + query.Objects.Count;
if (nextOffset < query.TotalObjectCount)
{
reply.NextPageToken = FormatPageToken(entry.Sequence, query.FilterSignature, nextOffset);
}
return reply;
}
public override async Task WatchDeployEvents(
@@ -92,7 +125,7 @@ public sealed class GalaxyRepositoryGrpcService(
}
lastSeen = null;
await responseStream.WriteAsync(MapDeployEvent(info), context.CancellationToken).ConfigureAwait(false);
await responseStream.WriteAsync(MapDeployEvent(info, ResolveBrowseSubtrees()), context.CancellationToken).ConfigureAwait(false);
}
}
@@ -120,14 +153,28 @@ public sealed class GalaxyRepositoryGrpcService(
}
}
private static DeployEvent MapDeployEvent(GalaxyDb.GalaxyDeployEventInfo info)
private DeployEvent MapDeployEvent(
GalaxyDb.GalaxyDeployEventInfo info,
IReadOnlyList<string> browseSubtrees)
{
int objectCount = info.ObjectCount;
int attributeCount = info.AttributeCount;
if (browseSubtrees.Count > 0 && cache.Current.HasData)
{
GalaxyDb.GalaxyHierarchyQueryResult scoped = GalaxyDb.GalaxyHierarchyProjector.Project(
cache.Current,
new DiscoverHierarchyRequest(),
browseSubtrees);
objectCount = scoped.TotalObjectCount;
attributeCount = scoped.Objects.Sum(obj => obj.Attributes.Count);
}
DeployEvent ev = new()
{
Sequence = (ulong)info.Sequence,
ObservedAt = Timestamp.FromDateTimeOffset(info.ObservedAt),
ObjectCount = info.ObjectCount,
AttributeCount = info.AttributeCount,
ObjectCount = objectCount,
AttributeCount = attributeCount,
TimeOfLastDeployPresent = info.TimeOfLastDeploy.HasValue,
};
if (info.TimeOfLastDeploy.HasValue)
@@ -144,6 +191,80 @@ public sealed class GalaxyRepositoryGrpcService(
_ => "Galaxy cache has no data available.",
};
private static int ResolvePageSize(int requestedPageSize)
{
if (requestedPageSize < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_size must be greater than zero when provided."));
}
int pageSize = requestedPageSize == 0 ? DefaultDiscoverPageSize : requestedPageSize;
return Math.Min(pageSize, MaxDiscoverPageSize);
}
private IReadOnlyList<string> ResolveBrowseSubtrees()
{
ApiKeyConstraints constraints = identityAccessor.Current?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
return constraints.BrowseSubtrees;
}
private static string FormatPageToken(long sequence, string filterSignature, int offset)
{
return string.Concat(
sequence.ToString(System.Globalization.CultureInfo.InvariantCulture),
":",
filterSignature,
":",
offset.ToString(System.Globalization.CultureInfo.InvariantCulture));
}
private static PageToken ParsePageToken(string pageToken, long currentSequence, string currentFilterSignature)
{
if (string.IsNullOrWhiteSpace(pageToken))
{
return new PageToken(currentSequence, currentFilterSignature, Offset: 0);
}
string[] parts = pageToken.Split(':', count: 3);
if (parts.Length != 3
|| !long.TryParse(
parts[0],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out long sequence)
|| !int.TryParse(
parts[2],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out int offset)
|| offset < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is invalid."));
}
if (sequence != currentSequence)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is stale."));
}
if (!string.Equals(parts[1], currentFilterSignature, StringComparison.Ordinal))
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token does not match the current filters."));
}
return new PageToken(sequence, parts[1], offset);
}
private sealed record PageToken(long Sequence, string FilterSignature, int Offset);
[System.Diagnostics.CodeAnalysis.SuppressMessage(
"Style",
"IDE0051:Remove unused private members",
@@ -1,8 +1,10 @@
using System.Diagnostics;
using Grpc.Core;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
@@ -12,6 +14,7 @@ namespace MxGateway.Server.Grpc;
public sealed class MxAccessGatewayService(
ISessionManager sessionManager,
IGatewayRequestIdentityAccessor identityAccessor,
IConstraintEnforcer constraintEnforcer,
MxAccessGrpcRequestValidator requestValidator,
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
@@ -87,12 +90,35 @@ public sealed class MxAccessGatewayService(
try
{
requestValidator.ValidateInvoke(request);
WorkerCommand workerCommand = mapper.MapCommand(request);
GatewaySession session = ResolveSession(request.SessionId);
MxCommand command = request.Command;
BulkConstraintPlan? bulkConstraintPlan = await ApplyConstraintsAsync(
session,
command,
context.CancellationToken)
.ConfigureAwait(false);
MxCommand commandToInvoke = bulkConstraintPlan?.Command ?? command;
if (bulkConstraintPlan is { HasAllowedItems: false })
{
return CreateDeniedBulkReply(request, bulkConstraintPlan);
}
MxCommandRequest invokeRequest = request.Clone();
invokeRequest.Command = commandToInvoke;
WorkerCommand workerCommand = mapper.MapCommand(invokeRequest);
WorkerCommandReply workerReply = await sessionManager
.InvokeAsync(request.SessionId, workerCommand, context.CancellationToken)
.ConfigureAwait(false);
return mapper.MapCommandReply(workerReply);
MxCommandReply publicReply = mapper.MapCommandReply(workerReply);
if (bulkConstraintPlan is not null)
{
publicReply = MergeDeniedBulkResults(publicReply, command.Kind, bulkConstraintPlan);
}
session.TrackCommandReply(commandToInvoke, publicReply);
return publicReply;
}
catch (Exception exception) when (exception is not RpcException)
{
@@ -129,6 +155,323 @@ public sealed class MxAccessGatewayService(
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
}
private GatewaySession ResolveSession(string sessionId)
{
if (!sessionManager.TryGetSession(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private async Task<BulkConstraintPlan?> ApplyConstraintsAsync(
GatewaySession session,
MxCommand command,
CancellationToken cancellationToken)
{
ApiKeyIdentity? identity = identityAccessor.Current;
switch (command.Kind)
{
case MxCommandKind.AddItem:
await EnforceReadTagAsync(identity, command.Kind, command.AddItem.ItemDefinition, cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.AddItem2:
await EnforceReadTagAsync(identity, command.Kind, command.AddItem2.ItemDefinition, cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.AddItemBulk:
return await FilterTagBulkAsync(
identity,
command,
command.AddItemBulk.ServerHandle,
command.AddItemBulk.TagAddresses,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.SubscribeBulk:
return await FilterTagBulkAsync(
identity,
command,
command.SubscribeBulk.ServerHandle,
command.SubscribeBulk.TagAddresses,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.AdviseItemBulk:
return await FilterHandleBulkAsync(
identity,
session,
command,
command.AdviseItemBulk.ServerHandle,
command.AdviseItemBulk.ItemHandles,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.Write:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.Write.ServerHandle,
command.Write.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.Write2:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.Write2.ServerHandle,
command.Write2.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.WriteSecured:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.WriteSecured.ServerHandle,
command.WriteSecured.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.WriteSecured2:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.WriteSecured2.ServerHandle,
command.WriteSecured2.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
default:
return null;
}
}
private async Task EnforceReadTagAsync(
ApiKeyIdentity? identity,
MxCommandKind commandKind,
string tagAddress,
CancellationToken cancellationToken)
{
ConstraintFailure? failure = await constraintEnforcer
.CheckReadTagAsync(identity, tagAddress, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
return;
}
await constraintEnforcer.RecordDenialAsync(identity, commandKind.ToString(), tagAddress, failure, cancellationToken)
.ConfigureAwait(false);
throw new RpcException(new Status(StatusCode.PermissionDenied, failure.Message));
}
private async Task EnforceWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
MxCommandKind commandKind,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken)
{
ConstraintFailure? failure = await constraintEnforcer
.CheckWriteHandleAsync(identity, session, serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
return;
}
await constraintEnforcer.RecordDenialAsync(identity, commandKind.ToString(), itemHandle.ToString(System.Globalization.CultureInfo.InvariantCulture), failure, cancellationToken)
.ConfigureAwait(false);
throw new RpcException(new Status(StatusCode.PermissionDenied, failure.Message));
}
private async Task<BulkConstraintPlan?> FilterTagBulkAsync(
ApiKeyIdentity? identity,
MxCommand command,
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
Dictionary<int, SubscribeResult> denied = [];
List<string> allowed = [];
for (int index = 0; index < tagAddresses.Count; index++)
{
string tagAddress = tagAddresses[index];
ConstraintFailure? failure = await constraintEnforcer
.CheckReadTagAsync(identity, tagAddress, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
allowed.Add(tagAddress);
continue;
}
await constraintEnforcer.RecordDenialAsync(identity, command.Kind.ToString(), tagAddress, failure, cancellationToken)
.ConfigureAwait(false);
denied[index] = new SubscribeResult
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
WasSuccessful = false,
ErrorMessage = failure.Message,
};
}
if (denied.Count == 0)
{
return null;
}
MxCommand filtered = command.Clone();
if (filtered.Kind == MxCommandKind.AddItemBulk)
{
filtered.AddItemBulk.TagAddresses.Clear();
filtered.AddItemBulk.TagAddresses.Add(allowed);
}
else
{
filtered.SubscribeBulk.TagAddresses.Clear();
filtered.SubscribeBulk.TagAddresses.Add(allowed);
}
return new BulkConstraintPlan(filtered, tagAddresses.Count, denied, allowed.Count > 0);
}
private async Task<BulkConstraintPlan?> FilterHandleBulkAsync(
ApiKeyIdentity? identity,
GatewaySession session,
MxCommand command,
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
Dictionary<int, SubscribeResult> denied = [];
List<int> allowed = [];
for (int index = 0; index < itemHandles.Count; index++)
{
int itemHandle = itemHandles[index];
ConstraintFailure? failure = await constraintEnforcer
.CheckReadHandleAsync(identity, session, serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
allowed.Add(itemHandle);
continue;
}
await constraintEnforcer.RecordDenialAsync(identity, command.Kind.ToString(), itemHandle.ToString(System.Globalization.CultureInfo.InvariantCulture), failure, cancellationToken)
.ConfigureAwait(false);
denied[index] = new SubscribeResult
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
WasSuccessful = false,
ErrorMessage = failure.Message,
};
}
if (denied.Count == 0)
{
return null;
}
MxCommand filtered = command.Clone();
filtered.AdviseItemBulk.ItemHandles.Clear();
filtered.AdviseItemBulk.ItemHandles.Add(allowed);
return new BulkConstraintPlan(filtered, itemHandles.Count, denied, allowed.Count > 0);
}
private static MxCommandReply CreateDeniedBulkReply(
MxCommandRequest request,
BulkConstraintPlan plan)
{
MxCommandReply reply = new()
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
Kind = request.Command.Kind,
ProtocolStatus = MxAccessGrpcMapper.Ok(),
};
SetBulkPayload(reply, request.Command.Kind, BuildMergedBulkReply(new BulkSubscribeReply(), plan));
return reply;
}
private static MxCommandReply MergeDeniedBulkResults(
MxCommandReply reply,
MxCommandKind commandKind,
BulkConstraintPlan plan)
{
BulkSubscribeReply allowed = GetBulkPayload(reply, commandKind) ?? new BulkSubscribeReply();
SetBulkPayload(reply, commandKind, BuildMergedBulkReply(allowed, plan));
return reply;
}
private static BulkSubscribeReply BuildMergedBulkReply(
BulkSubscribeReply allowed,
BulkConstraintPlan plan)
{
Queue<SubscribeResult> allowedResults = new(allowed.Results);
BulkSubscribeReply merged = new();
for (int index = 0; index < plan.OriginalCount; index++)
{
if (plan.DeniedResults.TryGetValue(index, out SubscribeResult? denied))
{
merged.Results.Add(denied);
}
else if (allowedResults.TryDequeue(out SubscribeResult? allowedResult))
{
merged.Results.Add(allowedResult);
}
}
return merged;
}
private static BulkSubscribeReply? GetBulkPayload(MxCommandReply reply, MxCommandKind commandKind)
{
return commandKind switch
{
MxCommandKind.AddItemBulk => reply.AddItemBulk,
MxCommandKind.AdviseItemBulk => reply.AdviseItemBulk,
MxCommandKind.SubscribeBulk => reply.SubscribeBulk,
_ => null,
};
}
private static void SetBulkPayload(
MxCommandReply reply,
MxCommandKind commandKind,
BulkSubscribeReply payload)
{
switch (commandKind)
{
case MxCommandKind.AddItemBulk:
reply.AddItemBulk = payload;
break;
case MxCommandKind.AdviseItemBulk:
reply.AdviseItemBulk = payload;
break;
case MxCommandKind.SubscribeBulk:
reply.SubscribeBulk = payload;
break;
}
}
private sealed record BulkConstraintPlan(
MxCommand Command,
int OriginalCount,
IReadOnlyDictionary<int, SubscribeResult> DeniedResults,
bool HasAllowedItems);
private RpcException MapException(Exception exception)
{
if (exception is OperationCanceledException)
@@ -58,6 +58,7 @@ public sealed class ApiKeyAdminCliRunner(
SecretHash: hasher.HashSecret(secret),
DisplayName: Required(command.DisplayName),
Scopes: command.Scopes,
Constraints: command.Constraints,
CreatedUtc: DateTimeOffset.UtcNow),
cancellationToken)
.ConfigureAwait(false);
@@ -163,6 +164,7 @@ public sealed class ApiKeyAdminCliRunner(
KeyPrefix: key.KeyPrefix,
DisplayName: key.DisplayName,
Scopes: key.Scopes,
Constraints: key.Constraints,
CreatedUtc: key.CreatedUtc,
LastUsedUtc: key.LastUsedUtc,
RevokedUtc: key.RevokedUtc);
@@ -7,4 +7,5 @@ public sealed record ApiKeyAdminCommand(
string? Pepper,
string? KeyId,
string? DisplayName,
IReadOnlySet<string> Scopes);
IReadOnlySet<string> Scopes,
ApiKeyConstraints Constraints);
@@ -19,7 +19,7 @@ public static class ApiKeyAdminCommandLineParser
return ApiKeyAdminParseResult.Fail($"Unknown apikey subcommand '{args[1]}'.");
}
Dictionary<string, string?> options = new(StringComparer.OrdinalIgnoreCase);
Dictionary<string, List<string?>> options = new(StringComparer.OrdinalIgnoreCase);
bool json = false;
for (int index = 2; index < args.Count; index++)
@@ -49,18 +49,42 @@ public static class ApiKeyAdminCommandLineParser
{
if (index + 1 >= args.Count || args[index + 1].StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Option '--{name}' requires a value.");
if (IsBooleanConstraintFlag(name))
{
value = "true";
}
else
{
return ApiKeyAdminParseResult.Fail($"Option '--{name}' requires a value.");
}
}
else
{
value = args[++index];
}
value = args[++index];
}
options[name] = value;
if (!options.TryGetValue(name, out List<string?>? values))
{
values = [];
options[name] = values;
}
values.Add(value);
}
string? keyId = GetOption(options, "key-id");
string? displayName = GetOption(options, "display-name");
IReadOnlySet<string> scopes = ParseScopes(GetOption(options, "scopes"));
ApiKeyConstraints constraints;
try
{
constraints = ParseConstraints(options);
}
catch (FormatException exception)
{
return ApiKeyAdminParseResult.Fail(exception.Message);
}
string? validationError = Validate(kind, keyId, displayName);
if (validationError is not null)
@@ -75,7 +99,8 @@ public static class ApiKeyAdminCommandLineParser
Pepper: GetOption(options, "pepper"),
KeyId: keyId,
DisplayName: displayName,
Scopes: scopes));
Scopes: scopes,
Constraints: constraints));
}
private static bool TryParseKind(string value, out ApiKeyAdminCommandKind kind)
@@ -144,9 +169,56 @@ public static class ApiKeyAdminCommandLineParser
|| character is '.' or '-');
}
private static string? GetOption(Dictionary<string, string?> options, string name)
private static string? GetOption(Dictionary<string, List<string?>> options, string name)
{
return options.TryGetValue(name, out string? value) ? value : null;
return options.TryGetValue(name, out List<string?>? values) && values.Count > 0 ? values[^1] : null;
}
private static IReadOnlyList<string> GetOptions(Dictionary<string, List<string?>> options, string name)
{
return options.TryGetValue(name, out List<string?>? values)
? values.Where(value => !string.IsNullOrWhiteSpace(value)).Select(value => value!).ToArray()
: Array.Empty<string>();
}
private static bool HasFlag(Dictionary<string, List<string?>> options, string name)
{
return options.ContainsKey(name);
}
private static bool IsBooleanConstraintFlag(string name)
{
return string.Equals(name, "read-alarm-only", StringComparison.OrdinalIgnoreCase)
|| string.Equals(name, "read-historized-only", StringComparison.OrdinalIgnoreCase);
}
private static ApiKeyConstraints ParseConstraints(Dictionary<string, List<string?>> options)
{
return new ApiKeyConstraints(
ReadSubtrees: GetOptions(options, "read-subtree"),
WriteSubtrees: GetOptions(options, "write-subtree"),
ReadTagGlobs: GetOptions(options, "read-tag-glob"),
WriteTagGlobs: GetOptions(options, "write-tag-glob"),
MaxWriteClassification: ParseNullableInt(GetOption(options, "max-write-classification")),
BrowseSubtrees: GetOptions(options, "browse-subtree"),
ReadAlarmOnly: HasFlag(options, "read-alarm-only"),
ReadHistorizedOnly: HasFlag(options, "read-historized-only"));
}
private static int? ParseNullableInt(string? value)
{
if (string.IsNullOrWhiteSpace(value))
{
return null;
}
return int.TryParse(
value,
System.Globalization.NumberStyles.Integer,
System.Globalization.CultureInfo.InvariantCulture,
out int parsed)
? parsed
: throw new FormatException("--max-write-classification must be an integer.");
}
private static IReadOnlySet<string> ParseScopes(string? scopes)
@@ -5,6 +5,7 @@ public sealed record ApiKeyAdminListedKey(
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes,
ApiKeyConstraints Constraints,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,28 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyConstraintSerializer
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
WriteIndented = false,
};
public static string? Serialize(ApiKeyConstraints constraints)
{
ArgumentNullException.ThrowIfNull(constraints);
return constraints.IsEmpty ? null : JsonSerializer.Serialize(constraints, JsonOptions);
}
public static ApiKeyConstraints Deserialize(string? json)
{
if (string.IsNullOrWhiteSpace(json))
{
return ApiKeyConstraints.Empty;
}
return JsonSerializer.Deserialize<ApiKeyConstraints>(json, JsonOptions) ?? ApiKeyConstraints.Empty;
}
}
@@ -0,0 +1,43 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyConstraints(
IReadOnlyList<string> ReadSubtrees,
IReadOnlyList<string> WriteSubtrees,
IReadOnlyList<string> ReadTagGlobs,
IReadOnlyList<string> WriteTagGlobs,
int? MaxWriteClassification,
IReadOnlyList<string> BrowseSubtrees,
bool ReadAlarmOnly,
bool ReadHistorizedOnly)
{
public static ApiKeyConstraints Empty { get; } = new(
ReadSubtrees: Array.Empty<string>(),
WriteSubtrees: Array.Empty<string>(),
ReadTagGlobs: Array.Empty<string>(),
WriteTagGlobs: Array.Empty<string>(),
MaxWriteClassification: null,
BrowseSubtrees: Array.Empty<string>(),
ReadAlarmOnly: false,
ReadHistorizedOnly: false);
public bool IsEmpty =>
ReadSubtrees.Count == 0
&& WriteSubtrees.Count == 0
&& ReadTagGlobs.Count == 0
&& WriteTagGlobs.Count == 0
&& MaxWriteClassification is null
&& BrowseSubtrees.Count == 0
&& !ReadAlarmOnly
&& !ReadHistorizedOnly;
public bool HasReadConstraints =>
ReadSubtrees.Count > 0
|| ReadTagGlobs.Count > 0
|| ReadAlarmOnly
|| ReadHistorizedOnly;
public bool HasWriteConstraints =>
WriteSubtrees.Count > 0
|| WriteTagGlobs.Count > 0
|| MaxWriteClassification is not null;
}
@@ -6,4 +6,5 @@ public sealed record ApiKeyCreateRequest(
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
ApiKeyConstraints Constraints,
DateTimeOffset CreatedUtc);
@@ -4,4 +4,8 @@ public sealed record ApiKeyIdentity(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes);
IReadOnlySet<string> Scopes,
ApiKeyConstraints? Constraints = null)
{
public ApiKeyConstraints EffectiveConstraints => Constraints ?? ApiKeyConstraints.Empty;
}
@@ -6,6 +6,7 @@ public sealed record ApiKeyRecord(
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
ApiKeyConstraints Constraints,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -12,9 +12,10 @@ public static class ApiKeyRecordReader
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
Constraints: ApiKeyConstraintSerializer.Deserialize(reader.IsDBNull(5) ? null : reader.GetString(5)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(6), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 7),
RevokedUtc: ReadNullableDateTimeOffset(reader, 8));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
@@ -52,6 +52,7 @@ public sealed class ApiKeyVerifier(
KeyId: storedKey.KeyId,
KeyPrefix: storedKey.KeyPrefix,
DisplayName: storedKey.DisplayName,
Scopes: storedKey.Scopes));
Scopes: storedKey.Scopes,
Constraints: storedKey.Constraints));
}
}
@@ -17,6 +17,7 @@ public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectio
secret_hash,
display_name,
scopes,
constraints,
created_utc,
last_used_utc,
revoked_utc)
@@ -26,6 +27,7 @@ public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectio
$secret_hash,
$display_name,
$scopes,
$constraints,
$created_utc,
NULL,
NULL);
@@ -42,7 +44,7 @@ public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectio
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
SELECT key_id, key_prefix, secret_hash, display_name, scopes, constraints, created_utc, last_used_utc, revoked_utc
FROM api_keys
ORDER BY key_id;
""";
@@ -111,6 +113,9 @@ public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectio
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = request.SecretHash;
command.Parameters.AddWithValue("$display_name", request.DisplayName);
command.Parameters.AddWithValue("$scopes", ApiKeyScopeSerializer.Serialize(request.Scopes));
command.Parameters.AddWithValue(
"$constraints",
(object?)ApiKeyConstraintSerializer.Serialize(request.Constraints) ?? DBNull.Value);
command.Parameters.AddWithValue("$created_utc", request.CreatedUtc.ToString("O"));
}
}
@@ -42,12 +42,12 @@ public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFact
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = requireActive
? """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
SELECT key_id, key_prefix, secret_hash, display_name, scopes, constraints, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id AND revoked_utc IS NULL;
"""
: """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
SELECT key_id, key_prefix, secret_hash, display_name, scopes, constraints, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id;
""";
@@ -2,7 +2,7 @@ namespace MxGateway.Server.Security.Authentication;
public static class SqliteAuthSchema
{
public const int CurrentVersion = 1;
public const int CurrentVersion = 2;
public const string SchemaVersionTable = "schema_version";
@@ -22,6 +22,8 @@ public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connecti
}
await ApplyVersionOneAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await ApplyVersionTwoAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await WriteSchemaVersionAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
@@ -83,6 +85,7 @@ public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connecti
secret_hash BLOB NOT NULL,
display_name TEXT NOT NULL,
scopes TEXT NOT NULL,
constraints TEXT NULL,
created_utc TEXT NOT NULL,
last_used_utc TEXT NULL,
revoked_utc TEXT NULL
@@ -105,6 +108,34 @@ public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connecti
""",
cancellationToken).ConfigureAwait(false);
}
private static async Task ApplyVersionTwoAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
if (await ColumnExistsAsync(connection, transaction, SqliteAuthSchema.ApiKeysTable, "constraints", cancellationToken)
.ConfigureAwait(false))
{
return;
}
await ExecuteNonQueryAsync(
connection,
transaction,
"""
ALTER TABLE api_keys
ADD COLUMN constraints TEXT NULL;
""",
cancellationToken).ConfigureAwait(false);
}
private static async Task WriteSchemaVersionAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
@@ -120,6 +151,31 @@ public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connecti
await versionCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task<bool> ColumnExistsAsync(
SqliteConnection connection,
SqliteTransaction transaction,
string tableName,
string columnName,
CancellationToken cancellationToken)
{
await using SqliteCommand command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText = $"PRAGMA table_info({tableName});";
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
if (string.Equals(reader.GetString(1), columnName, StringComparison.OrdinalIgnoreCase))
{
return true;
}
}
return false;
}
private static async Task ExecuteNonQueryAsync(
SqliteConnection connection,
SqliteTransaction transaction,
@@ -0,0 +1,165 @@
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Server.Galaxy;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Sessions;
namespace MxGateway.Server.Security.Authorization;
public sealed class ConstraintEnforcer(
IGalaxyHierarchyCache cache,
IApiKeyAuditStore auditStore) : IConstraintEnforcer
{
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken)
{
ApiKeyConstraints constraints = identity?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
if (!constraints.HasReadConstraints)
{
return Task.FromResult<ConstraintFailure?>(null);
}
return Task.FromResult(CheckReadTarget(constraints, tagAddress));
}
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken)
{
ApiKeyConstraints constraints = identity?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
if (!constraints.HasReadConstraints)
{
return Task.FromResult<ConstraintFailure?>(null);
}
if (!session.TryGetItemRegistration(serverHandle, itemHandle, out SessionItemRegistration registration))
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure("item_handle", "Item handle is not registered in the constrained session."));
}
return Task.FromResult(CheckReadTarget(constraints, registration.TagAddress));
}
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken)
{
ApiKeyConstraints constraints = identity?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
if (!constraints.HasWriteConstraints)
{
return Task.FromResult<ConstraintFailure?>(null);
}
if (!session.TryGetItemRegistration(serverHandle, itemHandle, out SessionItemRegistration registration))
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure("item_handle", "Item handle is not registered in the constrained session."));
}
GalaxyTagLookup? target = ResolveTarget(registration.TagAddress);
if (target is null)
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure("tag_metadata", "Tag metadata is not available in the Galaxy hierarchy cache."));
}
if (!MatchesPathOrTag(target.ContainedPath, registration.TagAddress, constraints.WriteSubtrees, constraints.WriteTagGlobs))
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure("write_scope", "Tag is outside the API key write scope."));
}
if (constraints.MaxWriteClassification is { } maxClassification)
{
GalaxyAttribute? attribute = target.Attribute;
if (attribute is null)
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure("max_write_classification", "Attribute security classification is not available."));
}
if (attribute.SecurityClassification > maxClassification)
{
return Task.FromResult<ConstraintFailure?>(new ConstraintFailure(
"max_write_classification",
$"Attribute security classification {attribute.SecurityClassification} exceeds allowed maximum {maxClassification}."));
}
}
return Task.FromResult<ConstraintFailure?>(null);
}
public async Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken)
{
await auditStore.AppendAsync(
new ApiKeyAuditEntry(
KeyId: identity?.KeyId,
EventType: "constraint-denied",
RemoteAddress: null,
Details: $"{commandKind}: {target}: {failure.ConstraintName}: {failure.Message}"),
cancellationToken)
.ConfigureAwait(false);
}
private ConstraintFailure? CheckReadTarget(
ApiKeyConstraints constraints,
string tagAddress)
{
GalaxyTagLookup? target = ResolveTarget(tagAddress);
if (target is null)
{
return new ConstraintFailure("tag_metadata", "Tag metadata is not available in the Galaxy hierarchy cache.");
}
if (!MatchesPathOrTag(target.ContainedPath, tagAddress, constraints.ReadSubtrees, constraints.ReadTagGlobs))
{
return new ConstraintFailure("read_scope", "Tag is outside the API key read scope.");
}
if (constraints.ReadAlarmOnly && target.Attribute is not { IsAlarm: true })
{
return new ConstraintFailure("read_alarm_only", "Tag is not an alarm-bearing attribute.");
}
if (constraints.ReadHistorizedOnly && target.Attribute is not { IsHistorized: true })
{
return new ConstraintFailure("read_historized_only", "Tag is not a historized attribute.");
}
return null;
}
private GalaxyTagLookup? ResolveTarget(string tagAddress)
{
GalaxyHierarchyCacheEntry entry = cache.Current;
return !string.IsNullOrWhiteSpace(tagAddress)
&& entry.Index.TagsByAddress.TryGetValue(tagAddress, out GalaxyTagLookup? lookup)
? lookup
: null;
}
private static bool MatchesPathOrTag(
string containedPath,
string tagAddress,
IReadOnlyList<string> subtreeGlobs,
IReadOnlyList<string> tagGlobs)
{
bool hasSubtreeConstraint = subtreeGlobs.Count > 0;
bool hasTagConstraint = tagGlobs.Count > 0;
if (!hasSubtreeConstraint && !hasTagConstraint)
{
return true;
}
return subtreeGlobs.Any(glob => GalaxyGlobMatcher.IsMatch(containedPath, glob))
|| tagGlobs.Any(glob => GalaxyGlobMatcher.IsMatch(tagAddress, glob));
}
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authorization;
public sealed record ConstraintFailure(string ConstraintName, string Message);
@@ -1,4 +1,6 @@
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Configuration;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authorization;
@@ -8,7 +10,17 @@ public static class GrpcAuthorizationServiceCollectionExtensions
{
services.AddSingleton<GatewayGrpcScopeResolver>();
services.AddSingleton<IGatewayRequestIdentityAccessor, GatewayRequestIdentityAccessor>();
services.AddSingleton<IConstraintEnforcer, ConstraintEnforcer>();
services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services
.AddOptions<global::Grpc.AspNetCore.Server.GrpcServiceOptions>()
.Configure<IConfiguration>((grpcOptions, configuration) =>
{
ProtocolOptions protocolOptions = new();
configuration.GetSection("MxGateway:Protocol").Bind(protocolOptions);
grpcOptions.MaxReceiveMessageSize = protocolOptions.MaxGrpcMessageBytes;
grpcOptions.MaxSendMessageSize = protocolOptions.MaxGrpcMessageBytes;
});
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());
return services;
@@ -0,0 +1,33 @@
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Sessions;
namespace MxGateway.Server.Security.Authorization;
public interface IConstraintEnforcer
{
Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken);
Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken);
Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken);
Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken);
}
+124 -1
View File
@@ -14,6 +14,7 @@ public sealed class GatewaySession
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = [];
public GatewaySession(
string sessionId,
@@ -27,6 +28,35 @@ public sealed class GatewaySession
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
: this(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
clientSessionName,
clientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
TimeSpan.FromMinutes(30),
openedAt)
{
}
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
TimeSpan leaseDuration,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -58,8 +88,10 @@ public sealed class GatewaySession
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
LeaseDuration = leaseDuration;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
_leaseExpiresAt = openedAt + leaseDuration;
}
public string SessionId { get; }
@@ -82,6 +114,8 @@ public sealed class GatewaySession
public TimeSpan ShutdownTimeout { get; }
public TimeSpan LeaseDuration { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
@@ -195,6 +229,7 @@ public sealed class GatewaySession
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
_leaseExpiresAt = activityAt + LeaseDuration;
}
}
@@ -210,7 +245,9 @@ public sealed class GatewaySession
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
return _activeEventSubscriberCount == 0
&& _leaseExpiresAt is not null
&& _leaseExpiresAt <= now;
}
}
@@ -247,6 +284,58 @@ public sealed class GatewaySession
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public bool TryGetItemRegistration(
int serverHandle,
int itemHandle,
out SessionItemRegistration registration)
{
lock (_syncRoot)
{
return _items.TryGetValue((serverHandle, itemHandle), out registration!);
}
}
public void TrackCommandReply(
MxCommand command,
MxCommandReply reply)
{
if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok)
{
return;
}
lock (_syncRoot)
{
switch (command.Kind)
{
case MxCommandKind.AddItem when reply.AddItem is not null:
TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition);
break;
case MxCommandKind.AddItem2 when reply.AddItem2 is not null:
TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition);
break;
case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null:
TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition);
break;
case MxCommandKind.AddItemBulk when reply.AddItemBulk is not null:
TrackBulkItems(reply.AddItemBulk);
break;
case MxCommandKind.SubscribeBulk when reply.SubscribeBulk is not null:
TrackBulkItems(reply.SubscribeBulk);
break;
case MxCommandKind.RemoveItem:
_items.Remove((command.RemoveItem.ServerHandle, command.RemoveItem.ItemHandle));
break;
case MxCommandKind.RemoveItemBulk:
RemoveItems(command.RemoveItemBulk.ServerHandle, command.RemoveItemBulk.ItemHandles);
break;
case MxCommandKind.UnsubscribeBulk:
RemoveItems(command.UnsubscribeBulk.ServerHandle, command.UnsubscribeBulk.ItemHandles);
break;
}
}
}
public Task<IReadOnlyList<SubscribeResult>> AddItemBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
@@ -485,6 +574,40 @@ public sealed class GatewaySession
}
}
private void TrackItem(
int serverHandle,
int itemHandle,
string tagAddress)
{
if (itemHandle == 0 || string.IsNullOrWhiteSpace(tagAddress))
{
return;
}
_items[(serverHandle, itemHandle)] = new SessionItemRegistration(serverHandle, itemHandle, tagAddress);
}
private void TrackBulkItems(BulkSubscribeReply reply)
{
foreach (SubscribeResult result in reply.Results)
{
if (result.WasSuccessful)
{
TrackItem(result.ServerHandle, result.ItemHandle, result.TagAddress);
}
}
}
private void RemoveItems(
int serverHandle,
IEnumerable<int> itemHandles)
{
foreach (int itemHandle in itemHandles)
{
_items.Remove((serverHandle, itemHandle));
}
}
private void DetachEventSubscriber()
{
lock (_syncRoot)
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Sessions;
public sealed record SessionItemRegistration(
int ServerHandle,
int ItemHandle,
string TagAddress);
@@ -0,0 +1,45 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Sessions;
public sealed class SessionLeaseMonitorHostedService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
ILogger<SessionLeaseMonitorHostedService> logger,
TimeProvider? timeProvider = null) : BackgroundService
{
private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
TimeSpan interval = TimeSpan.FromSeconds(Math.Max(1, options.Value.Sessions.LeaseSweepIntervalSeconds));
using PeriodicTimer timer = new(interval, _timeProvider);
try
{
while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
{
try
{
await sessionManager
.CloseExpiredLeasesAsync(_timeProvider.GetUtcNow(), stoppingToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception exception)
{
logger.LogWarning(exception, "Session lease sweep failed.");
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
}
}
@@ -287,6 +287,7 @@ public sealed class SessionManager : ISessionManager
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
TimeSpan leaseDuration = TimeSpan.FromSeconds(_options.Sessions.DefaultLeaseSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
@@ -303,6 +304,7 @@ public sealed class SessionManager : ISessionManager
commandTimeout,
startupTimeout,
shutdownTimeout,
leaseDuration,
openedAt);
}
@@ -7,6 +7,7 @@ public static class SessionServiceCollectionExtensions
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
services.AddHostedService<SessionLeaseMonitorHostedService>();
services.AddHostedService<SessionShutdownHostedService>();
return services;
+36 -3
View File
@@ -231,11 +231,17 @@ public sealed class WorkerClient : IWorkerClient
}
WorkerClientState state = State;
if (state is WorkerClientState.Closed or WorkerClientState.Faulted)
if (state == WorkerClientState.Closed)
{
return;
}
if (state == WorkerClientState.Faulted)
{
KillOwnedProcess("ShutdownFaulted");
return;
}
MarkClosing();
await EnqueueAsync(CreateShutdownEnvelope(timeout, "gateway-shutdown"), cancellationToken).ConfigureAwait(false);
_outboundEnvelopes.Writer.TryComplete();
@@ -263,8 +269,7 @@ public sealed class WorkerClient : IWorkerClient
public void Kill(string reason)
{
ThrowIfDisposed();
_connection.ProcessHandle?.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
KillOwnedProcess(reason);
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
$"Worker was killed by the gateway: {reason}.",
@@ -279,6 +284,7 @@ public sealed class WorkerClient : IWorkerClient
}
_disposed = true;
KillOwnedProcess("Dispose");
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
@@ -607,12 +613,39 @@ public sealed class WorkerClient : IWorkerClient
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete(fault);
_events.Writer.TryComplete(fault);
KillOwnedProcess(errorCode.ToString());
CompletePendingCommands(fault);
RecordWorkerStoppedOnce(errorCode.ToString());
_metrics?.Fault(errorCode.ToString());
_logger.LogWarning(exception, "Worker client faulted for session {SessionId}: {Message}", SessionId, message);
}
private void KillOwnedProcess(string reason)
{
WorkerProcessHandle? processHandle = _connection.ProcessHandle;
if (processHandle is null)
{
return;
}
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
}
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Failed to kill worker process {ProcessId} for session {SessionId}.",
processHandle.ProcessId,
SessionId);
}
}
private void RecordWorkerStoppedOnce(string reason)
{
bool shouldRecord;
+5 -1
View File
@@ -25,6 +25,9 @@
"Sessions": {
"DefaultCommandTimeoutSeconds": 30,
"MaxSessions": 64,
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
},
"Events": {
@@ -42,7 +45,8 @@
"ShowTagValues": false
},
"Protocol": {
"WorkerProtocolVersion": 1
"WorkerProtocolVersion": 1,
"MaxGrpcMessageBytes": 16777216
},
"Galaxy": {
"ConnectionString": "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;",
@@ -30,6 +30,8 @@ public sealed class GatewayOptionsTests
Assert.Equal(30, options.Sessions.DefaultCommandTimeoutSeconds);
Assert.Equal(64, options.Sessions.MaxSessions);
Assert.Equal(1800, options.Sessions.DefaultLeaseSeconds);
Assert.Equal(30, options.Sessions.LeaseSweepIntervalSeconds);
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
Assert.Equal(10_000, options.Events.QueueCapacity);
@@ -45,6 +47,7 @@ public sealed class GatewayOptionsTests
Assert.False(options.Dashboard.ShowTagValues);
Assert.Equal(1u, options.Protocol.WorkerProtocolVersion);
Assert.Equal(16 * 1024 * 1024, options.Protocol.MaxGrpcMessageBytes);
}
[Fact]
@@ -56,22 +59,29 @@ public sealed class GatewayOptionsTests
["MxGateway:Authentication:Mode"] = "Disabled",
["MxGateway:Worker:ExecutablePath"] = @"C:\Gateway\MxGateway.Worker.exe",
["MxGateway:Sessions:MaxSessions"] = "12",
["MxGateway:Sessions:DefaultLeaseSeconds"] = "900",
["MxGateway:Events:QueueCapacity"] = "256",
["MxGateway:Dashboard:Enabled"] = "false"
["MxGateway:Dashboard:Enabled"] = "false",
["MxGateway:Protocol:MaxGrpcMessageBytes"] = "8388608"
});
Assert.Equal(AuthenticationMode.Disabled, options.Authentication.Mode);
Assert.Equal(@"C:\Gateway\MxGateway.Worker.exe", options.Worker.ExecutablePath);
Assert.Equal(12, options.Sessions.MaxSessions);
Assert.Equal(900, options.Sessions.DefaultLeaseSeconds);
Assert.Equal(256, options.Events.QueueCapacity);
Assert.False(options.Dashboard.Enabled);
Assert.Equal(8 * 1024 * 1024, options.Protocol.MaxGrpcMessageBytes);
}
[Theory]
[InlineData("MxGateway:Worker:ExecutablePath", "worker.dll", "MxGateway:Worker:ExecutablePath must point to a .exe file.")]
[InlineData("MxGateway:Worker:StartupProbeRetryAttempts", "0", "MxGateway:Worker:StartupProbeRetryAttempts must be greater than zero.")]
[InlineData("MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds", "0", "MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.")]
[InlineData("MxGateway:Sessions:DefaultLeaseSeconds", "0", "MxGateway:Sessions:DefaultLeaseSeconds must be greater than zero.")]
[InlineData("MxGateway:Sessions:LeaseSweepIntervalSeconds", "0", "MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.")]
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
[InlineData("MxGateway:Protocol:MaxGrpcMessageBytes", "0", "MxGateway:Protocol:MaxGrpcMessageBytes must be between")]
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
[InlineData("MxGateway:Dashboard:PathBase", "dashboard", "MxGateway:Dashboard:PathBase must start with '/'.")]
public void Validation_InvalidConfiguration_FailsClearly(string key, string value, string expectedFailure)
@@ -11,9 +11,9 @@ public sealed class GatewayContractInfoTests
}
[Fact]
public void GatewayProtocolVersion_StartsAtVersionOne()
public void GatewayProtocolVersion_IsVersionTwo()
{
Assert.Equal(1u, GatewayContractInfo.GatewayProtocolVersion);
Assert.Equal(2u, GatewayContractInfo.GatewayProtocolVersion);
}
[Fact]

Some files were not shown because too many files have changed in this diff Show More