Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a5098e6815 | |||
| 41ddd122a6 | |||
| a25f09e795 | |||
| 37da9d8f44 | |||
| a19af5f7cb | |||
| 03ab36c4d5 | |||
| 91ea71b0b7 | |||
| 7dfec6dc8c | |||
| a462f68dbd | |||
| 16c18954b6 |
@@ -0,0 +1,46 @@
|
||||
# Protobuf Contracts
|
||||
|
||||
The contracts project contains the public gRPC API and the gateway-to-worker
|
||||
IPC messages. The `.proto` files are the source of truth; generated C# files are
|
||||
recreated by the contracts project build.
|
||||
|
||||
## Files
|
||||
|
||||
`src/MxGateway.Contracts/Protos/mxaccess_gateway.proto` defines the public
|
||||
`MxAccessGateway` gRPC service, command payloads, command replies, event DTOs,
|
||||
`MxValue`, `MxArray`, and `MxStatusProxy`.
|
||||
|
||||
`src/MxGateway.Contracts/Protos/mxaccess_worker.proto` defines the named-pipe
|
||||
worker IPC envelope and control messages. It imports
|
||||
`mxaccess_gateway.proto` so the worker and gateway use the same command, reply,
|
||||
event, value, and status shapes.
|
||||
|
||||
Generated C# output is written to `src/MxGateway.Contracts/Generated/`. Do not
|
||||
hand-edit generated files.
|
||||
|
||||
## Generation
|
||||
|
||||
Run the contracts build to regenerate C# protobuf and gRPC code:
|
||||
|
||||
```bash
|
||||
dotnet build src/MxGateway.Contracts/MxGateway.Contracts.csproj
|
||||
```
|
||||
|
||||
Run the focused contract tests after changing either `.proto` file:
|
||||
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter ProtobufContractRoundTripTests
|
||||
```
|
||||
|
||||
The full solution build also regenerates the C# contracts before compiling
|
||||
gateway and test projects:
|
||||
|
||||
```bash
|
||||
dotnet build src/MxGateway.sln
|
||||
```
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Gateway Process Detailed Design](./gateway-process-design.md)
|
||||
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
|
||||
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
|
||||
@@ -0,0 +1,54 @@
|
||||
# Worker Frame Protocol
|
||||
|
||||
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
|
||||
messages over a bidirectional named pipe. The frame layer is deliberately small:
|
||||
it handles message boundaries, size limits, protobuf parsing, and envelope
|
||||
validation before higher-level worker client code routes commands, replies,
|
||||
events, and faults.
|
||||
|
||||
## Frame Format
|
||||
|
||||
Each frame starts with a four-byte little-endian unsigned payload length,
|
||||
followed by the serialized `WorkerEnvelope` payload:
|
||||
|
||||
```text
|
||||
uint32 little-endian payload_length
|
||||
payload_length bytes protobuf WorkerEnvelope
|
||||
```
|
||||
|
||||
The reader rejects zero-length payloads and payloads larger than the configured
|
||||
maximum before allocating the payload buffer. The default maximum is 16 MiB,
|
||||
matching the gateway process design.
|
||||
|
||||
## Envelope Validation
|
||||
|
||||
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
|
||||
owning session before returning or writing it:
|
||||
|
||||
- `protocol_version` must match the configured worker protocol version,
|
||||
- `session_id` must match the owning gateway session,
|
||||
- the envelope must contain one typed `body` value.
|
||||
|
||||
Protocol violations throw `WorkerFrameProtocolException` with a
|
||||
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
|
||||
oversized frames, protocol version mismatches, and session mismatches.
|
||||
|
||||
## Verification
|
||||
|
||||
Run the focused tests after changing the frame protocol:
|
||||
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
|
||||
```
|
||||
|
||||
Run the gateway build because the frame protocol is part of
|
||||
`MxGateway.Server`:
|
||||
|
||||
```bash
|
||||
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
|
||||
```
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Gateway Process Detailed Design](./gateway-process-design.md)
|
||||
- [Protobuf Contracts](./Contracts.md)
|
||||
@@ -105,6 +105,12 @@ Do not let Razor components directly mutate gateway session or worker objects.
|
||||
Create a small read-only dashboard service that projects gateway state into
|
||||
plain DTOs.
|
||||
|
||||
`GatewayMetrics.GetSnapshot()` is the metrics input for the first dashboard
|
||||
projection. It carries current session and worker gauges, command and event
|
||||
counters, queue depth, and fault totals. The dashboard reads that snapshot
|
||||
instead of reading raw `Meter` instruments because exporter configuration is an
|
||||
operations concern, not a UI dependency.
|
||||
|
||||
Suggested service:
|
||||
|
||||
```csharp
|
||||
@@ -361,4 +367,3 @@ The first dashboard slice should implement:
|
||||
8. workers page with worker table.
|
||||
9. 1-second realtime refresh through Blazor Server.
|
||||
10. redaction tests for secrets.
|
||||
|
||||
|
||||
@@ -664,6 +664,26 @@ Metrics:
|
||||
|
||||
Do not log credential values or full tag values by default.
|
||||
|
||||
The gateway registers `GatewayMetrics` as the in-process metrics foundation.
|
||||
It emits .NET `Meter` instruments for collectors and keeps a
|
||||
`GatewayMetricsSnapshot` for dashboard projection. The snapshot exists because
|
||||
the dashboard needs current counters and queue depths without depending on a
|
||||
specific metrics exporter.
|
||||
|
||||
HTTP request handling uses `UseGatewayRequestLoggingScope()` to attach common
|
||||
structured log fields when request metadata is present:
|
||||
|
||||
- `SessionId`,
|
||||
- `ClientIdentity`,
|
||||
- `WorkerProcessId`,
|
||||
- `CorrelationId`,
|
||||
- `CommandMethod`.
|
||||
|
||||
`GatewayLogRedactor` redacts API key secrets and command values before they are
|
||||
added to log state. Value logging remains opt-in and redacted by default so
|
||||
secured writes, authentication commands, and ordinary tag values do not leak
|
||||
through diagnostics.
|
||||
|
||||
## Configuration
|
||||
|
||||
Suggested configuration shape:
|
||||
@@ -710,6 +730,18 @@ Suggested configuration shape:
|
||||
|
||||
Do not scatter connection or path constants through implementation code.
|
||||
|
||||
`MxGateway.Server` binds this section to `GatewayOptions` at startup and
|
||||
registers validation with `ValidateOnStart()`. Startup fails before the gateway
|
||||
begins serving traffic when required authentication settings are missing,
|
||||
timeouts or queue sizes are not positive, dashboard settings are malformed, or
|
||||
the configured worker protocol version does not match the contract version.
|
||||
|
||||
The gateway exposes read-only effective settings through
|
||||
`IGatewayConfigurationProvider`. This projection is for dashboard settings and
|
||||
diagnostics, so it redacts secret-related fields such as
|
||||
`Authentication:PepperSecretName` and does not include raw API keys or key
|
||||
material.
|
||||
|
||||
## Galaxy Repository Metadata
|
||||
|
||||
Galaxy hierarchy and tag metadata can be discovered through SQL Server when
|
||||
|
||||
@@ -45,6 +45,8 @@ Detailed follow-up docs:
|
||||
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
||||
session manager, worker supervision, gRPC API, event streaming, fault model,
|
||||
security, observability, and test strategy.
|
||||
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
|
||||
reader/writer and `WorkerEnvelope` validation rules.
|
||||
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
||||
MXAccess worker instance, including STA ownership, message pumping, COM
|
||||
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
||||
@@ -97,6 +99,13 @@ Responsibilities:
|
||||
|
||||
The gateway must never instantiate or call MXAccess directly.
|
||||
|
||||
The gateway observability foundation lives in `MxGateway.Server.Diagnostics`
|
||||
and `MxGateway.Server.Metrics`. Structured logging scopes carry session,
|
||||
worker, correlation, command, and client identity fields with redaction applied
|
||||
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
|
||||
histograms through .NET `Meter` and a snapshot API that dashboard services can
|
||||
project without binding to a metrics exporter.
|
||||
|
||||
### Worker Process
|
||||
|
||||
Runtime:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,268 @@
|
||||
// <auto-generated>
|
||||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
// source: mxaccess_gateway.proto
|
||||
// </auto-generated>
|
||||
#pragma warning disable 0414, 1591, 8981, 0612
|
||||
#region Designer generated code
|
||||
|
||||
using grpc = global::Grpc.Core;
|
||||
|
||||
namespace MxGateway.Contracts.Proto {
|
||||
/// <summary>
|
||||
/// Public client API for MXAccess sessions hosted by the gateway.
|
||||
/// </summary>
|
||||
public static partial class MxAccessGateway
|
||||
{
|
||||
static readonly string __ServiceName = "mxaccess_gateway.v1.MxAccessGateway";
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
|
||||
{
|
||||
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
|
||||
if (message is global::Google.Protobuf.IBufferMessage)
|
||||
{
|
||||
context.SetPayloadLength(message.CalculateSize());
|
||||
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
|
||||
context.Complete();
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static class __Helper_MessageCache<T>
|
||||
{
|
||||
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
|
||||
{
|
||||
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
|
||||
if (__Helper_MessageCache<T>.IsBufferMessage)
|
||||
{
|
||||
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
|
||||
}
|
||||
#endif
|
||||
return parser.ParseFrom(context.PayloadAsNewBuffer());
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.OpenSessionRequest> __Marshaller_mxaccess_gateway_v1_OpenSessionRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.OpenSessionRequest.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.OpenSessionReply> __Marshaller_mxaccess_gateway_v1_OpenSessionReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.OpenSessionReply.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.CloseSessionRequest> __Marshaller_mxaccess_gateway_v1_CloseSessionRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.CloseSessionRequest.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.CloseSessionReply> __Marshaller_mxaccess_gateway_v1_CloseSessionReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.CloseSessionReply.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.MxCommandRequest> __Marshaller_mxaccess_gateway_v1_MxCommandRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.MxCommandRequest.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.MxCommandReply> __Marshaller_mxaccess_gateway_v1_MxCommandReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.MxCommandReply.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.StreamEventsRequest> __Marshaller_mxaccess_gateway_v1_StreamEventsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.StreamEventsRequest.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.MxEvent> __Marshaller_mxaccess_gateway_v1_MxEvent = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.MxEvent.Parser));
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.OpenSessionRequest, global::MxGateway.Contracts.Proto.OpenSessionReply> __Method_OpenSession = new grpc::Method<global::MxGateway.Contracts.Proto.OpenSessionRequest, global::MxGateway.Contracts.Proto.OpenSessionReply>(
|
||||
grpc::MethodType.Unary,
|
||||
__ServiceName,
|
||||
"OpenSession",
|
||||
__Marshaller_mxaccess_gateway_v1_OpenSessionRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_OpenSessionReply);
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.CloseSessionRequest, global::MxGateway.Contracts.Proto.CloseSessionReply> __Method_CloseSession = new grpc::Method<global::MxGateway.Contracts.Proto.CloseSessionRequest, global::MxGateway.Contracts.Proto.CloseSessionReply>(
|
||||
grpc::MethodType.Unary,
|
||||
__ServiceName,
|
||||
"CloseSession",
|
||||
__Marshaller_mxaccess_gateway_v1_CloseSessionRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_CloseSessionReply);
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.MxCommandRequest, global::MxGateway.Contracts.Proto.MxCommandReply> __Method_Invoke = new grpc::Method<global::MxGateway.Contracts.Proto.MxCommandRequest, global::MxGateway.Contracts.Proto.MxCommandReply>(
|
||||
grpc::MethodType.Unary,
|
||||
__ServiceName,
|
||||
"Invoke",
|
||||
__Marshaller_mxaccess_gateway_v1_MxCommandRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_MxCommandReply);
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.StreamEventsRequest, global::MxGateway.Contracts.Proto.MxEvent> __Method_StreamEvents = new grpc::Method<global::MxGateway.Contracts.Proto.StreamEventsRequest, global::MxGateway.Contracts.Proto.MxEvent>(
|
||||
grpc::MethodType.ServerStreaming,
|
||||
__ServiceName,
|
||||
"StreamEvents",
|
||||
__Marshaller_mxaccess_gateway_v1_StreamEventsRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_MxEvent);
|
||||
|
||||
/// <summary>Service descriptor</summary>
|
||||
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
|
||||
{
|
||||
get { return global::MxGateway.Contracts.Proto.MxaccessGatewayReflection.Descriptor.Services[0]; }
|
||||
}
|
||||
|
||||
/// <summary>Base class for server-side implementations of MxAccessGateway</summary>
|
||||
[grpc::BindServiceMethod(typeof(MxAccessGateway), "BindService")]
|
||||
public abstract partial class MxAccessGatewayBase
|
||||
{
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::System.Threading.Tasks.Task<global::MxGateway.Contracts.Proto.OpenSessionReply> OpenSession(global::MxGateway.Contracts.Proto.OpenSessionRequest request, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::System.Threading.Tasks.Task<global::MxGateway.Contracts.Proto.CloseSessionReply> CloseSession(global::MxGateway.Contracts.Proto.CloseSessionRequest request, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::System.Threading.Tasks.Task<global::MxGateway.Contracts.Proto.MxCommandReply> Invoke(global::MxGateway.Contracts.Proto.MxCommandRequest request, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::System.Threading.Tasks.Task StreamEvents(global::MxGateway.Contracts.Proto.StreamEventsRequest request, grpc::IServerStreamWriter<global::MxGateway.Contracts.Proto.MxEvent> responseStream, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Client for MxAccessGateway</summary>
|
||||
public partial class MxAccessGatewayClient : grpc::ClientBase<MxAccessGatewayClient>
|
||||
{
|
||||
/// <summary>Creates a new client for MxAccessGateway</summary>
|
||||
/// <param name="channel">The channel to use to make remote calls.</param>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public MxAccessGatewayClient(grpc::ChannelBase channel) : base(channel)
|
||||
{
|
||||
}
|
||||
/// <summary>Creates a new client for MxAccessGateway that uses a custom <c>CallInvoker</c>.</summary>
|
||||
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public MxAccessGatewayClient(grpc::CallInvoker callInvoker) : base(callInvoker)
|
||||
{
|
||||
}
|
||||
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
protected MxAccessGatewayClient() : base()
|
||||
{
|
||||
}
|
||||
/// <summary>Protected constructor to allow creation of configured clients.</summary>
|
||||
/// <param name="configuration">The client configuration.</param>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
protected MxAccessGatewayClient(ClientBaseConfiguration configuration) : base(configuration)
|
||||
{
|
||||
}
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.OpenSessionReply OpenSession(global::MxGateway.Contracts.Proto.OpenSessionRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return OpenSession(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.OpenSessionReply OpenSession(global::MxGateway.Contracts.Proto.OpenSessionRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.BlockingUnaryCall(__Method_OpenSession, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.OpenSessionReply> OpenSessionAsync(global::MxGateway.Contracts.Proto.OpenSessionRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return OpenSessionAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.OpenSessionReply> OpenSessionAsync(global::MxGateway.Contracts.Proto.OpenSessionRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncUnaryCall(__Method_OpenSession, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.CloseSessionReply CloseSession(global::MxGateway.Contracts.Proto.CloseSessionRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return CloseSession(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.CloseSessionReply CloseSession(global::MxGateway.Contracts.Proto.CloseSessionRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.BlockingUnaryCall(__Method_CloseSession, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.CloseSessionReply> CloseSessionAsync(global::MxGateway.Contracts.Proto.CloseSessionRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return CloseSessionAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.CloseSessionReply> CloseSessionAsync(global::MxGateway.Contracts.Proto.CloseSessionRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncUnaryCall(__Method_CloseSession, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.MxCommandReply Invoke(global::MxGateway.Contracts.Proto.MxCommandRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return Invoke(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::MxGateway.Contracts.Proto.MxCommandReply Invoke(global::MxGateway.Contracts.Proto.MxCommandRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.BlockingUnaryCall(__Method_Invoke, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.MxCommandReply> InvokeAsync(global::MxGateway.Contracts.Proto.MxCommandRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return InvokeAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncUnaryCall<global::MxGateway.Contracts.Proto.MxCommandReply> InvokeAsync(global::MxGateway.Contracts.Proto.MxCommandRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncUnaryCall(__Method_Invoke, null, options, request);
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.MxEvent> StreamEvents(global::MxGateway.Contracts.Proto.StreamEventsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return StreamEvents(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.MxEvent> StreamEvents(global::MxGateway.Contracts.Proto.StreamEventsRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncServerStreamingCall(__Method_StreamEvents, null, options, request);
|
||||
}
|
||||
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
protected override MxAccessGatewayClient NewInstance(ClientBaseConfiguration configuration)
|
||||
{
|
||||
return new MxAccessGatewayClient(configuration);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Creates service definition that can be registered with a server</summary>
|
||||
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public static grpc::ServerServiceDefinition BindService(MxAccessGatewayBase serviceImpl)
|
||||
{
|
||||
return grpc::ServerServiceDefinition.CreateBuilder()
|
||||
.AddMethod(__Method_OpenSession, serviceImpl.OpenSession)
|
||||
.AddMethod(__Method_CloseSession, serviceImpl.CloseSession)
|
||||
.AddMethod(__Method_Invoke, serviceImpl.Invoke)
|
||||
.AddMethod(__Method_StreamEvents, serviceImpl.StreamEvents).Build();
|
||||
}
|
||||
|
||||
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
|
||||
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
|
||||
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
|
||||
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public static void BindService(grpc::ServiceBinderBase serviceBinder, MxAccessGatewayBase serviceImpl)
|
||||
{
|
||||
serviceBinder.AddMethod(__Method_OpenSession, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::MxGateway.Contracts.Proto.OpenSessionRequest, global::MxGateway.Contracts.Proto.OpenSessionReply>(serviceImpl.OpenSession));
|
||||
serviceBinder.AddMethod(__Method_CloseSession, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::MxGateway.Contracts.Proto.CloseSessionRequest, global::MxGateway.Contracts.Proto.CloseSessionReply>(serviceImpl.CloseSession));
|
||||
serviceBinder.AddMethod(__Method_Invoke, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::MxGateway.Contracts.Proto.MxCommandRequest, global::MxGateway.Contracts.Proto.MxCommandReply>(serviceImpl.Invoke));
|
||||
serviceBinder.AddMethod(__Method_StreamEvents, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::MxGateway.Contracts.Proto.StreamEventsRequest, global::MxGateway.Contracts.Proto.MxEvent>(serviceImpl.StreamEvents));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
File diff suppressed because it is too large
Load Diff
@@ -4,4 +4,19 @@
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Remove="Generated\**\*.cs" />
|
||||
<Protobuf Include="Protos\mxaccess_gateway.proto" ProtoRoot="Protos" OutputDir="Generated" GrpcOutputDir="Generated" GrpcServices="Both" />
|
||||
<Protobuf Include="Protos\mxaccess_worker.proto" ProtoRoot="Protos" OutputDir="Generated" GrpcServices="None" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Google.Protobuf" Version="3.34.1" />
|
||||
<PackageReference Include="Grpc.Core.Api" Version="2.76.0" />
|
||||
<PackageReference Include="Grpc.Tools" Version="2.80.0">
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -0,0 +1,521 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package mxaccess_gateway.v1;
|
||||
|
||||
option csharp_namespace = "MxGateway.Contracts.Proto";
|
||||
|
||||
import "google/protobuf/duration.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
// Public client API for MXAccess sessions hosted by the gateway.
|
||||
service MxAccessGateway {
|
||||
rpc OpenSession(OpenSessionRequest) returns (OpenSessionReply);
|
||||
rpc CloseSession(CloseSessionRequest) returns (CloseSessionReply);
|
||||
rpc Invoke(MxCommandRequest) returns (MxCommandReply);
|
||||
rpc StreamEvents(StreamEventsRequest) returns (stream MxEvent);
|
||||
}
|
||||
|
||||
message OpenSessionRequest {
|
||||
string requested_backend = 1;
|
||||
string client_session_name = 2;
|
||||
string client_correlation_id = 3;
|
||||
google.protobuf.Duration command_timeout = 4;
|
||||
}
|
||||
|
||||
message OpenSessionReply {
|
||||
string session_id = 1;
|
||||
string backend_name = 2;
|
||||
int32 worker_process_id = 3;
|
||||
uint32 worker_protocol_version = 4;
|
||||
repeated string capabilities = 5;
|
||||
google.protobuf.Duration default_command_timeout = 6;
|
||||
ProtocolStatus protocol_status = 7;
|
||||
}
|
||||
|
||||
message CloseSessionRequest {
|
||||
string session_id = 1;
|
||||
string client_correlation_id = 2;
|
||||
}
|
||||
|
||||
message CloseSessionReply {
|
||||
string session_id = 1;
|
||||
SessionState final_state = 2;
|
||||
ProtocolStatus protocol_status = 3;
|
||||
}
|
||||
|
||||
message StreamEventsRequest {
|
||||
string session_id = 1;
|
||||
uint64 after_worker_sequence = 2;
|
||||
}
|
||||
|
||||
message MxCommandRequest {
|
||||
string session_id = 1;
|
||||
string client_correlation_id = 2;
|
||||
MxCommand command = 3;
|
||||
}
|
||||
|
||||
message MxCommand {
|
||||
MxCommandKind kind = 1;
|
||||
|
||||
oneof payload {
|
||||
RegisterCommand register = 10;
|
||||
UnregisterCommand unregister = 11;
|
||||
AddItemCommand add_item = 12;
|
||||
AddItem2Command add_item2 = 13;
|
||||
RemoveItemCommand remove_item = 14;
|
||||
AdviseCommand advise = 15;
|
||||
UnAdviseCommand un_advise = 16;
|
||||
AdviseSupervisoryCommand advise_supervisory = 17;
|
||||
AddBufferedItemCommand add_buffered_item = 18;
|
||||
SetBufferedUpdateIntervalCommand set_buffered_update_interval = 19;
|
||||
SuspendCommand suspend = 20;
|
||||
ActivateCommand activate = 21;
|
||||
WriteCommand write = 22;
|
||||
Write2Command write2 = 23;
|
||||
WriteSecuredCommand write_secured = 24;
|
||||
WriteSecured2Command write_secured2 = 25;
|
||||
AuthenticateUserCommand authenticate_user = 26;
|
||||
ArchestrAUserToIdCommand archestra_user_to_id = 27;
|
||||
PingCommand ping = 100;
|
||||
GetSessionStateCommand get_session_state = 101;
|
||||
GetWorkerInfoCommand get_worker_info = 102;
|
||||
DrainEventsCommand drain_events = 103;
|
||||
ShutdownWorkerCommand shutdown_worker = 104;
|
||||
}
|
||||
}
|
||||
|
||||
enum MxCommandKind {
|
||||
MX_COMMAND_KIND_UNSPECIFIED = 0;
|
||||
MX_COMMAND_KIND_REGISTER = 1;
|
||||
MX_COMMAND_KIND_UNREGISTER = 2;
|
||||
MX_COMMAND_KIND_ADD_ITEM = 3;
|
||||
MX_COMMAND_KIND_ADD_ITEM2 = 4;
|
||||
MX_COMMAND_KIND_REMOVE_ITEM = 5;
|
||||
MX_COMMAND_KIND_ADVISE = 6;
|
||||
MX_COMMAND_KIND_UN_ADVISE = 7;
|
||||
MX_COMMAND_KIND_ADVISE_SUPERVISORY = 8;
|
||||
MX_COMMAND_KIND_ADD_BUFFERED_ITEM = 9;
|
||||
MX_COMMAND_KIND_SET_BUFFERED_UPDATE_INTERVAL = 10;
|
||||
MX_COMMAND_KIND_SUSPEND = 11;
|
||||
MX_COMMAND_KIND_ACTIVATE = 12;
|
||||
MX_COMMAND_KIND_WRITE = 13;
|
||||
MX_COMMAND_KIND_WRITE2 = 14;
|
||||
MX_COMMAND_KIND_WRITE_SECURED = 15;
|
||||
MX_COMMAND_KIND_WRITE_SECURED2 = 16;
|
||||
MX_COMMAND_KIND_AUTHENTICATE_USER = 17;
|
||||
MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID = 18;
|
||||
MX_COMMAND_KIND_PING = 100;
|
||||
MX_COMMAND_KIND_GET_SESSION_STATE = 101;
|
||||
MX_COMMAND_KIND_GET_WORKER_INFO = 102;
|
||||
MX_COMMAND_KIND_DRAIN_EVENTS = 103;
|
||||
MX_COMMAND_KIND_SHUTDOWN_WORKER = 104;
|
||||
}
|
||||
|
||||
message RegisterCommand {
|
||||
string client_name = 1;
|
||||
}
|
||||
|
||||
message UnregisterCommand {
|
||||
int32 server_handle = 1;
|
||||
}
|
||||
|
||||
message AddItemCommand {
|
||||
int32 server_handle = 1;
|
||||
string item_definition = 2;
|
||||
}
|
||||
|
||||
message AddItem2Command {
|
||||
int32 server_handle = 1;
|
||||
string item_definition = 2;
|
||||
string item_context = 3;
|
||||
}
|
||||
|
||||
message RemoveItemCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message AdviseCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message UnAdviseCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message AdviseSupervisoryCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message AddBufferedItemCommand {
|
||||
int32 server_handle = 1;
|
||||
string item_definition = 2;
|
||||
string item_context = 3;
|
||||
}
|
||||
|
||||
message SetBufferedUpdateIntervalCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 update_interval_milliseconds = 2;
|
||||
}
|
||||
|
||||
message SuspendCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message ActivateCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
}
|
||||
|
||||
message WriteCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
MxValue value = 3;
|
||||
int32 user_id = 4;
|
||||
}
|
||||
|
||||
message Write2Command {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
MxValue value = 3;
|
||||
MxValue timestamp_value = 4;
|
||||
int32 user_id = 5;
|
||||
}
|
||||
|
||||
message WriteSecuredCommand {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
int32 current_user_id = 3;
|
||||
int32 verifier_user_id = 4;
|
||||
// Credential-sensitive write value. Implementations must not log this field
|
||||
// unless an explicit redacted value-logging path is enabled.
|
||||
MxValue value = 5;
|
||||
}
|
||||
|
||||
message WriteSecured2Command {
|
||||
int32 server_handle = 1;
|
||||
int32 item_handle = 2;
|
||||
int32 current_user_id = 3;
|
||||
int32 verifier_user_id = 4;
|
||||
// Credential-sensitive write value. Implementations must not log this field
|
||||
// unless an explicit redacted value-logging path is enabled.
|
||||
MxValue value = 5;
|
||||
MxValue timestamp_value = 6;
|
||||
}
|
||||
|
||||
message AuthenticateUserCommand {
|
||||
int32 server_handle = 1;
|
||||
string verify_user = 2;
|
||||
// Raw MXAccess credential. Implementations must keep this field out of logs,
|
||||
// metrics labels, command lines, and diagnostics.
|
||||
string verify_user_password = 3;
|
||||
}
|
||||
|
||||
message ArchestrAUserToIdCommand {
|
||||
int32 server_handle = 1;
|
||||
string user_id_guid = 2;
|
||||
}
|
||||
|
||||
message PingCommand {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message GetSessionStateCommand {
|
||||
}
|
||||
|
||||
message GetWorkerInfoCommand {
|
||||
}
|
||||
|
||||
message DrainEventsCommand {
|
||||
uint32 max_events = 1;
|
||||
}
|
||||
|
||||
message ShutdownWorkerCommand {
|
||||
google.protobuf.Duration grace_period = 1;
|
||||
}
|
||||
|
||||
message MxCommandReply {
|
||||
string session_id = 1;
|
||||
string correlation_id = 2;
|
||||
MxCommandKind kind = 3;
|
||||
ProtocolStatus protocol_status = 4;
|
||||
// HRESULT captured from MXAccess or a COM exception. This remains separate
|
||||
// from gateway protocol status so MXAccess parity details are not hidden by
|
||||
// transport failures.
|
||||
optional int32 hresult = 5;
|
||||
MxValue return_value = 6;
|
||||
repeated MxStatusProxy statuses = 7;
|
||||
string diagnostic_message = 8;
|
||||
|
||||
oneof payload {
|
||||
RegisterReply register = 20;
|
||||
AddItemReply add_item = 21;
|
||||
AddItem2Reply add_item2 = 22;
|
||||
AddBufferedItemReply add_buffered_item = 23;
|
||||
SuspendReply suspend = 24;
|
||||
ActivateReply activate = 25;
|
||||
AuthenticateUserReply authenticate_user = 26;
|
||||
ArchestrAUserToIdReply archestra_user_to_id = 27;
|
||||
SessionStateReply session_state = 100;
|
||||
WorkerInfoReply worker_info = 101;
|
||||
DrainEventsReply drain_events = 102;
|
||||
}
|
||||
}
|
||||
|
||||
message RegisterReply {
|
||||
int32 server_handle = 1;
|
||||
}
|
||||
|
||||
message AddItemReply {
|
||||
int32 item_handle = 1;
|
||||
}
|
||||
|
||||
message AddItem2Reply {
|
||||
int32 item_handle = 1;
|
||||
}
|
||||
|
||||
message AddBufferedItemReply {
|
||||
int32 item_handle = 1;
|
||||
}
|
||||
|
||||
message SuspendReply {
|
||||
MxStatusProxy status = 1;
|
||||
}
|
||||
|
||||
message ActivateReply {
|
||||
MxStatusProxy status = 1;
|
||||
}
|
||||
|
||||
message AuthenticateUserReply {
|
||||
int32 user_id = 1;
|
||||
}
|
||||
|
||||
message ArchestrAUserToIdReply {
|
||||
int32 user_id = 1;
|
||||
}
|
||||
|
||||
message SessionStateReply {
|
||||
SessionState state = 1;
|
||||
}
|
||||
|
||||
message WorkerInfoReply {
|
||||
int32 worker_process_id = 1;
|
||||
string worker_version = 2;
|
||||
string mxaccess_progid = 3;
|
||||
string mxaccess_clsid = 4;
|
||||
}
|
||||
|
||||
message DrainEventsReply {
|
||||
repeated MxEvent events = 1;
|
||||
}
|
||||
|
||||
message MxEvent {
|
||||
MxEventFamily family = 1;
|
||||
string session_id = 2;
|
||||
int32 server_handle = 3;
|
||||
int32 item_handle = 4;
|
||||
MxValue value = 5;
|
||||
int32 quality = 6;
|
||||
google.protobuf.Timestamp source_timestamp = 7;
|
||||
repeated MxStatusProxy statuses = 8;
|
||||
uint64 worker_sequence = 9;
|
||||
google.protobuf.Timestamp worker_timestamp = 10;
|
||||
google.protobuf.Timestamp gateway_receive_timestamp = 11;
|
||||
optional int32 hresult = 12;
|
||||
string raw_status = 13;
|
||||
|
||||
oneof body {
|
||||
OnDataChangeEvent on_data_change = 20;
|
||||
OnWriteCompleteEvent on_write_complete = 21;
|
||||
OperationCompleteEvent operation_complete = 22;
|
||||
OnBufferedDataChangeEvent on_buffered_data_change = 23;
|
||||
}
|
||||
}
|
||||
|
||||
enum MxEventFamily {
|
||||
MX_EVENT_FAMILY_UNSPECIFIED = 0;
|
||||
MX_EVENT_FAMILY_ON_DATA_CHANGE = 1;
|
||||
MX_EVENT_FAMILY_ON_WRITE_COMPLETE = 2;
|
||||
MX_EVENT_FAMILY_OPERATION_COMPLETE = 3;
|
||||
MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE = 4;
|
||||
}
|
||||
|
||||
message OnDataChangeEvent {
|
||||
}
|
||||
|
||||
message OnWriteCompleteEvent {
|
||||
}
|
||||
|
||||
message OperationCompleteEvent {
|
||||
}
|
||||
|
||||
message OnBufferedDataChangeEvent {
|
||||
MxDataType data_type = 1;
|
||||
MxArray quality_values = 2;
|
||||
MxArray timestamp_values = 3;
|
||||
int32 raw_data_type = 4;
|
||||
}
|
||||
|
||||
message MxStatusProxy {
|
||||
int32 success = 1;
|
||||
MxStatusCategory category = 2;
|
||||
MxStatusSource detected_by = 3;
|
||||
int32 detail = 4;
|
||||
int32 raw_category = 5;
|
||||
int32 raw_detected_by = 6;
|
||||
string diagnostic_text = 7;
|
||||
}
|
||||
|
||||
enum MxStatusCategory {
|
||||
MX_STATUS_CATEGORY_UNSPECIFIED = 0;
|
||||
MX_STATUS_CATEGORY_UNKNOWN = 1;
|
||||
MX_STATUS_CATEGORY_OK = 2;
|
||||
MX_STATUS_CATEGORY_PENDING = 3;
|
||||
MX_STATUS_CATEGORY_WARNING = 4;
|
||||
MX_STATUS_CATEGORY_COMMUNICATION_ERROR = 5;
|
||||
MX_STATUS_CATEGORY_CONFIGURATION_ERROR = 6;
|
||||
MX_STATUS_CATEGORY_OPERATIONAL_ERROR = 7;
|
||||
MX_STATUS_CATEGORY_SECURITY_ERROR = 8;
|
||||
MX_STATUS_CATEGORY_SOFTWARE_ERROR = 9;
|
||||
MX_STATUS_CATEGORY_OTHER_ERROR = 10;
|
||||
}
|
||||
|
||||
enum MxStatusSource {
|
||||
MX_STATUS_SOURCE_UNSPECIFIED = 0;
|
||||
MX_STATUS_SOURCE_UNKNOWN = 1;
|
||||
MX_STATUS_SOURCE_REQUESTING_LMX = 2;
|
||||
MX_STATUS_SOURCE_RESPONDING_LMX = 3;
|
||||
MX_STATUS_SOURCE_REQUESTING_NMX = 4;
|
||||
MX_STATUS_SOURCE_RESPONDING_NMX = 5;
|
||||
MX_STATUS_SOURCE_REQUESTING_AUTOMATION_OBJECT = 6;
|
||||
MX_STATUS_SOURCE_RESPONDING_AUTOMATION_OBJECT = 7;
|
||||
}
|
||||
|
||||
message MxValue {
|
||||
MxDataType data_type = 1;
|
||||
string variant_type = 2;
|
||||
bool is_null = 3;
|
||||
string raw_diagnostic = 4;
|
||||
int32 raw_data_type = 5;
|
||||
|
||||
oneof kind {
|
||||
bool bool_value = 10;
|
||||
int32 int32_value = 11;
|
||||
int64 int64_value = 12;
|
||||
float float_value = 13;
|
||||
double double_value = 14;
|
||||
string string_value = 15;
|
||||
google.protobuf.Timestamp timestamp_value = 16;
|
||||
MxArray array_value = 17;
|
||||
bytes raw_value = 18;
|
||||
}
|
||||
}
|
||||
|
||||
message MxArray {
|
||||
MxDataType element_data_type = 1;
|
||||
string variant_type = 2;
|
||||
repeated uint32 dimensions = 3;
|
||||
string raw_diagnostic = 4;
|
||||
int32 raw_element_data_type = 5;
|
||||
|
||||
oneof values {
|
||||
BoolArray bool_values = 10;
|
||||
Int32Array int32_values = 11;
|
||||
Int64Array int64_values = 12;
|
||||
FloatArray float_values = 13;
|
||||
DoubleArray double_values = 14;
|
||||
StringArray string_values = 15;
|
||||
TimestampArray timestamp_values = 16;
|
||||
RawArray raw_values = 17;
|
||||
}
|
||||
}
|
||||
|
||||
message BoolArray {
|
||||
repeated bool values = 1;
|
||||
}
|
||||
|
||||
message Int32Array {
|
||||
repeated int32 values = 1;
|
||||
}
|
||||
|
||||
message Int64Array {
|
||||
repeated int64 values = 1;
|
||||
}
|
||||
|
||||
message FloatArray {
|
||||
repeated float values = 1;
|
||||
}
|
||||
|
||||
message DoubleArray {
|
||||
repeated double values = 1;
|
||||
}
|
||||
|
||||
message StringArray {
|
||||
repeated string values = 1;
|
||||
}
|
||||
|
||||
message TimestampArray {
|
||||
repeated google.protobuf.Timestamp values = 1;
|
||||
}
|
||||
|
||||
message RawArray {
|
||||
repeated bytes values = 1;
|
||||
}
|
||||
|
||||
enum MxDataType {
|
||||
MX_DATA_TYPE_UNSPECIFIED = 0;
|
||||
MX_DATA_TYPE_UNKNOWN = 1;
|
||||
MX_DATA_TYPE_NO_DATA = 2;
|
||||
MX_DATA_TYPE_BOOLEAN = 3;
|
||||
MX_DATA_TYPE_INTEGER = 4;
|
||||
MX_DATA_TYPE_FLOAT = 5;
|
||||
MX_DATA_TYPE_DOUBLE = 6;
|
||||
MX_DATA_TYPE_STRING = 7;
|
||||
MX_DATA_TYPE_TIME = 8;
|
||||
MX_DATA_TYPE_ELAPSED_TIME = 9;
|
||||
MX_DATA_TYPE_REFERENCE_TYPE = 10;
|
||||
MX_DATA_TYPE_STATUS_TYPE = 11;
|
||||
MX_DATA_TYPE_ENUM = 12;
|
||||
MX_DATA_TYPE_SECURITY_CLASSIFICATION_ENUM = 13;
|
||||
MX_DATA_TYPE_DATA_QUALITY_TYPE = 14;
|
||||
MX_DATA_TYPE_QUALIFIED_ENUM = 15;
|
||||
MX_DATA_TYPE_QUALIFIED_STRUCT = 16;
|
||||
MX_DATA_TYPE_INTERNATIONALIZED_STRING = 17;
|
||||
MX_DATA_TYPE_BIG_STRING = 18;
|
||||
MX_DATA_TYPE_END = 19;
|
||||
}
|
||||
|
||||
message ProtocolStatus {
|
||||
ProtocolStatusCode code = 1;
|
||||
string message = 2;
|
||||
}
|
||||
|
||||
enum ProtocolStatusCode {
|
||||
PROTOCOL_STATUS_CODE_UNSPECIFIED = 0;
|
||||
PROTOCOL_STATUS_CODE_OK = 1;
|
||||
PROTOCOL_STATUS_CODE_INVALID_REQUEST = 2;
|
||||
PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND = 3;
|
||||
PROTOCOL_STATUS_CODE_SESSION_NOT_READY = 4;
|
||||
PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE = 5;
|
||||
PROTOCOL_STATUS_CODE_TIMEOUT = 6;
|
||||
PROTOCOL_STATUS_CODE_CANCELED = 7;
|
||||
PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION = 8;
|
||||
PROTOCOL_STATUS_CODE_MXACCESS_FAILURE = 9;
|
||||
}
|
||||
|
||||
enum SessionState {
|
||||
SESSION_STATE_UNSPECIFIED = 0;
|
||||
SESSION_STATE_CREATING = 1;
|
||||
SESSION_STATE_STARTING_WORKER = 2;
|
||||
SESSION_STATE_WAITING_FOR_PIPE = 3;
|
||||
SESSION_STATE_HANDSHAKING = 4;
|
||||
SESSION_STATE_INITIALIZING_WORKER = 5;
|
||||
SESSION_STATE_READY = 6;
|
||||
SESSION_STATE_CLOSING = 7;
|
||||
SESSION_STATE_CLOSED = 8;
|
||||
SESSION_STATE_FAULTED = 9;
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package mxaccess_worker.v1;
|
||||
|
||||
option csharp_namespace = "MxGateway.Contracts.Proto";
|
||||
|
||||
import "google/protobuf/duration.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "mxaccess_gateway.proto";
|
||||
|
||||
// Gateway-to-worker IPC envelope. Named-pipe framing prepends a little-endian
|
||||
// uint32 payload length to this protobuf payload.
|
||||
message WorkerEnvelope {
|
||||
uint32 protocol_version = 1;
|
||||
string session_id = 2;
|
||||
uint64 sequence = 3;
|
||||
string correlation_id = 4;
|
||||
|
||||
oneof body {
|
||||
GatewayHello gateway_hello = 10;
|
||||
WorkerHello worker_hello = 11;
|
||||
WorkerReady worker_ready = 12;
|
||||
WorkerCommand worker_command = 13;
|
||||
WorkerCommandReply worker_command_reply = 14;
|
||||
WorkerCancel worker_cancel = 15;
|
||||
WorkerShutdown worker_shutdown = 16;
|
||||
WorkerShutdownAck worker_shutdown_ack = 17;
|
||||
WorkerEvent worker_event = 18;
|
||||
WorkerHeartbeat worker_heartbeat = 19;
|
||||
WorkerFault worker_fault = 20;
|
||||
}
|
||||
}
|
||||
|
||||
message GatewayHello {
|
||||
uint32 supported_protocol_version = 1;
|
||||
string nonce = 2;
|
||||
string gateway_version = 3;
|
||||
}
|
||||
|
||||
message WorkerHello {
|
||||
uint32 protocol_version = 1;
|
||||
string nonce = 2;
|
||||
int32 worker_process_id = 3;
|
||||
string worker_version = 4;
|
||||
}
|
||||
|
||||
message WorkerReady {
|
||||
int32 worker_process_id = 1;
|
||||
string mxaccess_progid = 2;
|
||||
string mxaccess_clsid = 3;
|
||||
google.protobuf.Timestamp ready_timestamp = 4;
|
||||
}
|
||||
|
||||
message WorkerCommand {
|
||||
mxaccess_gateway.v1.MxCommand command = 1;
|
||||
google.protobuf.Timestamp enqueue_timestamp = 2;
|
||||
}
|
||||
|
||||
message WorkerCommandReply {
|
||||
mxaccess_gateway.v1.MxCommandReply reply = 1;
|
||||
google.protobuf.Timestamp completed_timestamp = 2;
|
||||
}
|
||||
|
||||
message WorkerCancel {
|
||||
string reason = 1;
|
||||
}
|
||||
|
||||
message WorkerShutdown {
|
||||
google.protobuf.Duration grace_period = 1;
|
||||
string reason = 2;
|
||||
}
|
||||
|
||||
message WorkerShutdownAck {
|
||||
mxaccess_gateway.v1.ProtocolStatus status = 1;
|
||||
}
|
||||
|
||||
message WorkerEvent {
|
||||
mxaccess_gateway.v1.MxEvent event = 1;
|
||||
}
|
||||
|
||||
message WorkerHeartbeat {
|
||||
int32 worker_process_id = 1;
|
||||
WorkerState state = 2;
|
||||
google.protobuf.Timestamp last_sta_activity_timestamp = 3;
|
||||
uint32 pending_command_count = 4;
|
||||
uint32 outbound_event_queue_depth = 5;
|
||||
uint64 last_event_sequence = 6;
|
||||
string current_command_correlation_id = 7;
|
||||
}
|
||||
|
||||
message WorkerFault {
|
||||
WorkerFaultCategory category = 1;
|
||||
string command_method = 2;
|
||||
optional int32 hresult = 3;
|
||||
string exception_type = 4;
|
||||
string diagnostic_message = 5;
|
||||
mxaccess_gateway.v1.ProtocolStatus protocol_status = 6;
|
||||
}
|
||||
|
||||
enum WorkerState {
|
||||
WORKER_STATE_UNSPECIFIED = 0;
|
||||
WORKER_STATE_STARTING = 1;
|
||||
WORKER_STATE_HANDSHAKING = 2;
|
||||
WORKER_STATE_INITIALIZING_STA = 3;
|
||||
WORKER_STATE_READY = 4;
|
||||
WORKER_STATE_EXECUTING_COMMAND = 5;
|
||||
WORKER_STATE_SHUTTING_DOWN = 6;
|
||||
WORKER_STATE_STOPPED = 7;
|
||||
WORKER_STATE_FAULTED = 8;
|
||||
}
|
||||
|
||||
enum WorkerFaultCategory {
|
||||
WORKER_FAULT_CATEGORY_UNSPECIFIED = 0;
|
||||
WORKER_FAULT_CATEGORY_INVALID_ARGUMENTS = 1;
|
||||
WORKER_FAULT_CATEGORY_GATEWAY_AUTHENTICATION_FAILED = 2;
|
||||
WORKER_FAULT_CATEGORY_PROTOCOL_MISMATCH = 3;
|
||||
WORKER_FAULT_CATEGORY_PROTOCOL_VIOLATION = 4;
|
||||
WORKER_FAULT_CATEGORY_PIPE_DISCONNECTED = 5;
|
||||
WORKER_FAULT_CATEGORY_MXACCESS_CREATION_FAILED = 6;
|
||||
WORKER_FAULT_CATEGORY_MXACCESS_COMMAND_FAILED = 7;
|
||||
WORKER_FAULT_CATEGORY_MXACCESS_EVENT_CONVERSION_FAILED = 8;
|
||||
WORKER_FAULT_CATEGORY_STA_HUNG = 9;
|
||||
WORKER_FAULT_CATEGORY_QUEUE_OVERFLOW = 10;
|
||||
WORKER_FAULT_CATEGORY_SHUTDOWN_TIMEOUT = 11;
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public enum AuthenticationMode
|
||||
{
|
||||
ApiKey,
|
||||
Disabled
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class AuthenticationOptions
|
||||
{
|
||||
public AuthenticationMode Mode { get; init; } = AuthenticationMode.ApiKey;
|
||||
|
||||
public string SqlitePath { get; init; } = @"C:\ProgramData\MxGateway\gateway-auth.db";
|
||||
|
||||
public string PepperSecretName { get; init; } = "MxGateway:ApiKeyPepper";
|
||||
|
||||
public bool RunMigrationsOnStartup { get; init; } = true;
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class DashboardOptions
|
||||
{
|
||||
public bool Enabled { get; init; } = true;
|
||||
|
||||
public string PathBase { get; init; } = "/dashboard";
|
||||
|
||||
public bool RequireAdminScope { get; init; } = true;
|
||||
|
||||
public bool AllowAnonymousLocalhost { get; init; }
|
||||
|
||||
public int SnapshotIntervalMilliseconds { get; init; } = 1_000;
|
||||
|
||||
public int RecentFaultLimit { get; init; } = 100;
|
||||
|
||||
public int RecentSessionLimit { get; init; } = 200;
|
||||
|
||||
public bool ShowTagValues { get; init; }
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveAuthenticationConfiguration(
|
||||
string Mode,
|
||||
string SqlitePath,
|
||||
string PepperSecretName,
|
||||
bool RunMigrationsOnStartup);
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveDashboardConfiguration(
|
||||
bool Enabled,
|
||||
string PathBase,
|
||||
bool RequireAdminScope,
|
||||
bool AllowAnonymousLocalhost,
|
||||
int SnapshotIntervalMilliseconds,
|
||||
int RecentFaultLimit,
|
||||
int RecentSessionLimit,
|
||||
bool ShowTagValues);
|
||||
@@ -0,0 +1,5 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveEventConfiguration(
|
||||
int QueueCapacity,
|
||||
string BackpressurePolicy);
|
||||
@@ -0,0 +1,9 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveGatewayConfiguration(
|
||||
EffectiveAuthenticationConfiguration Authentication,
|
||||
EffectiveWorkerConfiguration Worker,
|
||||
EffectiveSessionConfiguration Sessions,
|
||||
EffectiveEventConfiguration Events,
|
||||
EffectiveDashboardConfiguration Dashboard,
|
||||
EffectiveProtocolConfiguration Protocol);
|
||||
@@ -0,0 +1,3 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveProtocolConfiguration(uint WorkerProtocolVersion);
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveSessionConfiguration(
|
||||
int DefaultCommandTimeoutSeconds,
|
||||
int MaxSessions,
|
||||
bool AllowMultipleEventSubscribers);
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveWorkerConfiguration(
|
||||
string ExecutablePath,
|
||||
string? WorkingDirectory,
|
||||
string RequiredArchitecture,
|
||||
int StartupTimeoutSeconds,
|
||||
int ShutdownTimeoutSeconds,
|
||||
int HeartbeatIntervalSeconds,
|
||||
int HeartbeatGraceSeconds,
|
||||
int MaxMessageBytes);
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public enum EventBackpressurePolicy
|
||||
{
|
||||
FailFast
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class EventOptions
|
||||
{
|
||||
public int QueueCapacity { get; init; } = 10_000;
|
||||
|
||||
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> options) : IGatewayConfigurationProvider
|
||||
{
|
||||
public const string RedactedValue = "[redacted]";
|
||||
|
||||
public EffectiveGatewayConfiguration GetEffectiveConfiguration()
|
||||
{
|
||||
GatewayOptions value = options.Value;
|
||||
|
||||
return new EffectiveGatewayConfiguration(
|
||||
Authentication: new EffectiveAuthenticationConfiguration(
|
||||
Mode: value.Authentication.Mode.ToString(),
|
||||
SqlitePath: value.Authentication.SqlitePath,
|
||||
PepperSecretName: RedactedValue,
|
||||
RunMigrationsOnStartup: value.Authentication.RunMigrationsOnStartup),
|
||||
Worker: new EffectiveWorkerConfiguration(
|
||||
ExecutablePath: value.Worker.ExecutablePath,
|
||||
WorkingDirectory: value.Worker.WorkingDirectory,
|
||||
RequiredArchitecture: value.Worker.RequiredArchitecture.ToString(),
|
||||
StartupTimeoutSeconds: value.Worker.StartupTimeoutSeconds,
|
||||
ShutdownTimeoutSeconds: value.Worker.ShutdownTimeoutSeconds,
|
||||
HeartbeatIntervalSeconds: value.Worker.HeartbeatIntervalSeconds,
|
||||
HeartbeatGraceSeconds: value.Worker.HeartbeatGraceSeconds,
|
||||
MaxMessageBytes: value.Worker.MaxMessageBytes),
|
||||
Sessions: new EffectiveSessionConfiguration(
|
||||
DefaultCommandTimeoutSeconds: value.Sessions.DefaultCommandTimeoutSeconds,
|
||||
MaxSessions: value.Sessions.MaxSessions,
|
||||
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
|
||||
Events: new EffectiveEventConfiguration(
|
||||
QueueCapacity: value.Events.QueueCapacity,
|
||||
BackpressurePolicy: value.Events.BackpressurePolicy.ToString()),
|
||||
Dashboard: new EffectiveDashboardConfiguration(
|
||||
Enabled: value.Dashboard.Enabled,
|
||||
PathBase: value.Dashboard.PathBase,
|
||||
RequireAdminScope: value.Dashboard.RequireAdminScope,
|
||||
AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost,
|
||||
SnapshotIntervalMilliseconds: value.Dashboard.SnapshotIntervalMilliseconds,
|
||||
RecentFaultLimit: value.Dashboard.RecentFaultLimit,
|
||||
RecentSessionLimit: value.Dashboard.RecentSessionLimit,
|
||||
ShowTagValues: value.Dashboard.ShowTagValues),
|
||||
Protocol: new EffectiveProtocolConfiguration(value.Protocol.WorkerProtocolVersion));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public static class GatewayConfigurationServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddGatewayConfiguration(this IServiceCollection services)
|
||||
{
|
||||
services
|
||||
.AddOptions<GatewayOptions>()
|
||||
.BindConfiguration(GatewayOptions.SectionName)
|
||||
.ValidateOnStart();
|
||||
|
||||
services.AddSingleton<IValidateOptions<GatewayOptions>, GatewayOptionsValidator>();
|
||||
services.AddSingleton<IGatewayConfigurationProvider, GatewayConfigurationProvider>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class GatewayOptions
|
||||
{
|
||||
public const string SectionName = "MxGateway";
|
||||
|
||||
public AuthenticationOptions Authentication { get; init; } = new();
|
||||
|
||||
public WorkerOptions Worker { get; init; } = new();
|
||||
|
||||
public SessionOptions Sessions { get; init; } = new();
|
||||
|
||||
public EventOptions Events { get; init; } = new();
|
||||
|
||||
public DashboardOptions Dashboard { get; init; } = new();
|
||||
|
||||
public ProtocolOptions Protocol { get; init; } = new();
|
||||
}
|
||||
@@ -0,0 +1,210 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
|
||||
{
|
||||
private const int MinimumMaxMessageBytes = 1024;
|
||||
private const int MaximumMaxMessageBytes = 256 * 1024 * 1024;
|
||||
|
||||
public ValidateOptionsResult Validate(string? name, GatewayOptions options)
|
||||
{
|
||||
List<string> failures = [];
|
||||
|
||||
ValidateAuthentication(options.Authentication, failures);
|
||||
ValidateWorker(options.Worker, failures);
|
||||
ValidateSessions(options.Sessions, failures);
|
||||
ValidateEvents(options.Events, failures);
|
||||
ValidateDashboard(options.Dashboard, failures);
|
||||
ValidateProtocol(options.Protocol, failures);
|
||||
|
||||
return failures.Count == 0
|
||||
? ValidateOptionsResult.Success
|
||||
: ValidateOptionsResult.Fail(failures);
|
||||
}
|
||||
|
||||
private static void ValidateAuthentication(AuthenticationOptions options, List<string> failures)
|
||||
{
|
||||
if (!Enum.IsDefined(options.Mode))
|
||||
{
|
||||
failures.Add("MxGateway:Authentication:Mode must be a supported authentication mode.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (options.Mode == AuthenticationMode.ApiKey)
|
||||
{
|
||||
AddIfBlank(
|
||||
options.SqlitePath,
|
||||
"MxGateway:Authentication:SqlitePath is required when API-key authentication is enabled.",
|
||||
failures);
|
||||
AddIfInvalidPath(
|
||||
options.SqlitePath,
|
||||
"MxGateway:Authentication:SqlitePath must be a valid filesystem path.",
|
||||
failures);
|
||||
AddIfBlank(
|
||||
options.PepperSecretName,
|
||||
"MxGateway:Authentication:PepperSecretName is required when API-key authentication is enabled.",
|
||||
failures);
|
||||
}
|
||||
}
|
||||
|
||||
private static void ValidateWorker(WorkerOptions options, List<string> failures)
|
||||
{
|
||||
AddIfBlank(options.ExecutablePath, "MxGateway:Worker:ExecutablePath is required.", failures);
|
||||
AddIfInvalidPath(
|
||||
options.ExecutablePath,
|
||||
"MxGateway:Worker:ExecutablePath must be a valid filesystem path.",
|
||||
failures);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(options.ExecutablePath)
|
||||
&& !string.Equals(Path.GetExtension(options.ExecutablePath), ".exe", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
failures.Add("MxGateway:Worker:ExecutablePath must point to a .exe file.");
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(options.WorkingDirectory))
|
||||
{
|
||||
AddIfInvalidPath(
|
||||
options.WorkingDirectory,
|
||||
"MxGateway:Worker:WorkingDirectory must be a valid filesystem path.",
|
||||
failures);
|
||||
}
|
||||
|
||||
if (!Enum.IsDefined(options.RequiredArchitecture))
|
||||
{
|
||||
failures.Add("MxGateway:Worker:RequiredArchitecture must be a supported worker architecture.");
|
||||
}
|
||||
|
||||
AddIfNotPositive(
|
||||
options.StartupTimeoutSeconds,
|
||||
"MxGateway:Worker:StartupTimeoutSeconds must be greater than zero.",
|
||||
failures);
|
||||
AddIfNotPositive(
|
||||
options.ShutdownTimeoutSeconds,
|
||||
"MxGateway:Worker:ShutdownTimeoutSeconds must be greater than zero.",
|
||||
failures);
|
||||
AddIfNotPositive(
|
||||
options.HeartbeatIntervalSeconds,
|
||||
"MxGateway:Worker:HeartbeatIntervalSeconds must be greater than zero.",
|
||||
failures);
|
||||
AddIfNotPositive(
|
||||
options.HeartbeatGraceSeconds,
|
||||
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than zero.",
|
||||
failures);
|
||||
|
||||
if (options.HeartbeatGraceSeconds < options.HeartbeatIntervalSeconds)
|
||||
{
|
||||
failures.Add(
|
||||
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than or equal to HeartbeatIntervalSeconds.");
|
||||
}
|
||||
|
||||
if (options.MaxMessageBytes is < MinimumMaxMessageBytes or > MaximumMaxMessageBytes)
|
||||
{
|
||||
failures.Add(
|
||||
$"MxGateway:Worker:MaxMessageBytes must be between {MinimumMaxMessageBytes} and {MaximumMaxMessageBytes}.");
|
||||
}
|
||||
}
|
||||
|
||||
private static void ValidateSessions(SessionOptions options, List<string> failures)
|
||||
{
|
||||
AddIfNotPositive(
|
||||
options.DefaultCommandTimeoutSeconds,
|
||||
"MxGateway:Sessions:DefaultCommandTimeoutSeconds must be greater than zero.",
|
||||
failures);
|
||||
AddIfNotPositive(options.MaxSessions, "MxGateway:Sessions:MaxSessions must be greater than zero.", failures);
|
||||
}
|
||||
|
||||
private static void ValidateEvents(EventOptions options, List<string> failures)
|
||||
{
|
||||
AddIfNotPositive(options.QueueCapacity, "MxGateway:Events:QueueCapacity must be greater than zero.", failures);
|
||||
|
||||
if (!Enum.IsDefined(options.BackpressurePolicy))
|
||||
{
|
||||
failures.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
|
||||
}
|
||||
}
|
||||
|
||||
private static void ValidateDashboard(DashboardOptions options, List<string> failures)
|
||||
{
|
||||
if (options.Enabled)
|
||||
{
|
||||
AddIfBlank(options.PathBase, "MxGateway:Dashboard:PathBase is required when the dashboard is enabled.", failures);
|
||||
if (!string.IsNullOrWhiteSpace(options.PathBase) && !options.PathBase.StartsWith('/'))
|
||||
{
|
||||
failures.Add("MxGateway:Dashboard:PathBase must start with '/'.");
|
||||
}
|
||||
}
|
||||
|
||||
AddIfNotPositive(
|
||||
options.SnapshotIntervalMilliseconds,
|
||||
"MxGateway:Dashboard:SnapshotIntervalMilliseconds must be greater than zero.",
|
||||
failures);
|
||||
AddIfNegative(
|
||||
options.RecentFaultLimit,
|
||||
"MxGateway:Dashboard:RecentFaultLimit must be greater than or equal to zero.",
|
||||
failures);
|
||||
AddIfNegative(
|
||||
options.RecentSessionLimit,
|
||||
"MxGateway:Dashboard:RecentSessionLimit must be greater than or equal to zero.",
|
||||
failures);
|
||||
}
|
||||
|
||||
private static void ValidateProtocol(ProtocolOptions options, List<string> failures)
|
||||
{
|
||||
if (options.WorkerProtocolVersion != GatewayContractInfo.WorkerProtocolVersion)
|
||||
{
|
||||
failures.Add(
|
||||
$"MxGateway:Protocol:WorkerProtocolVersion must be {GatewayContractInfo.WorkerProtocolVersion}.");
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddIfBlank(string? value, string message, List<string> failures)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddIfNotPositive(int value, string message, List<string> failures)
|
||||
{
|
||||
if (value <= 0)
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddIfNegative(int value, string message, List<string> failures)
|
||||
{
|
||||
if (value < 0)
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddIfInvalidPath(string? value, string message, List<string> failures)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_ = Path.GetFullPath(value);
|
||||
}
|
||||
catch (ArgumentException)
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
catch (NotSupportedException)
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
catch (PathTooLongException)
|
||||
{
|
||||
failures.Add(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public interface IGatewayConfigurationProvider
|
||||
{
|
||||
EffectiveGatewayConfiguration GetEffectiveConfiguration();
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class ProtocolOptions
|
||||
{
|
||||
public uint WorkerProtocolVersion { get; init; } = GatewayContractInfo.WorkerProtocolVersion;
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class SessionOptions
|
||||
{
|
||||
public int DefaultCommandTimeoutSeconds { get; init; } = 30;
|
||||
|
||||
public int MaxSessions { get; init; } = 64;
|
||||
|
||||
public bool AllowMultipleEventSubscribers { get; init; }
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public enum WorkerArchitecture
|
||||
{
|
||||
X86,
|
||||
X64
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
public sealed class WorkerOptions
|
||||
{
|
||||
public string ExecutablePath { get; init; } =
|
||||
@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe";
|
||||
|
||||
public string? WorkingDirectory { get; init; }
|
||||
|
||||
public WorkerArchitecture RequiredArchitecture { get; init; } = WorkerArchitecture.X86;
|
||||
|
||||
public int StartupTimeoutSeconds { get; init; } = 30;
|
||||
|
||||
public int ShutdownTimeoutSeconds { get; init; } = 10;
|
||||
|
||||
public int HeartbeatIntervalSeconds { get; init; } = 5;
|
||||
|
||||
public int HeartbeatGraceSeconds { get; init; } = 15;
|
||||
|
||||
public int MaxMessageBytes { get; init; } = 16 * 1024 * 1024;
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
namespace MxGateway.Server.Diagnostics;
|
||||
|
||||
public static class GatewayLogRedactor
|
||||
{
|
||||
public const string RedactedValue = "[redacted]";
|
||||
|
||||
private static readonly HashSet<string> SensitiveCommandMethods = new(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
"AuthenticateUser",
|
||||
"WriteSecured",
|
||||
"WriteSecured2"
|
||||
};
|
||||
|
||||
public static bool IsCredentialBearingCommand(string? commandMethod)
|
||||
{
|
||||
return commandMethod is not null
|
||||
&& SensitiveCommandMethods.Contains(commandMethod);
|
||||
}
|
||||
|
||||
public static string? RedactApiKey(string? authorizationHeader)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(authorizationHeader))
|
||||
{
|
||||
return authorizationHeader;
|
||||
}
|
||||
|
||||
const string bearerPrefix = "Bearer ";
|
||||
if (!authorizationHeader.StartsWith(bearerPrefix, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return RedactedValue;
|
||||
}
|
||||
|
||||
string token = authorizationHeader[bearerPrefix.Length..].Trim();
|
||||
|
||||
if (!token.StartsWith("mxgw_", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return $"{bearerPrefix}{RedactedValue}";
|
||||
}
|
||||
|
||||
string[] tokenParts = token.Split('_', 3, StringSplitOptions.RemoveEmptyEntries);
|
||||
if (tokenParts.Length < 2)
|
||||
{
|
||||
return $"{bearerPrefix}mxgw_{RedactedValue}";
|
||||
}
|
||||
|
||||
return $"{bearerPrefix}mxgw_{tokenParts[1]}_{RedactedValue}";
|
||||
}
|
||||
|
||||
public static string? RedactClientIdentity(string? clientIdentity)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(clientIdentity))
|
||||
{
|
||||
return clientIdentity;
|
||||
}
|
||||
|
||||
return clientIdentity.Contains("mxgw_", StringComparison.OrdinalIgnoreCase)
|
||||
? RedactApiKey(clientIdentity)
|
||||
: clientIdentity;
|
||||
}
|
||||
|
||||
public static object? RedactCommandValue(
|
||||
string? commandMethod,
|
||||
object? value,
|
||||
bool valueLoggingEnabled = false)
|
||||
{
|
||||
if (value is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!valueLoggingEnabled || IsCredentialBearingCommand(commandMethod))
|
||||
{
|
||||
return RedactedValue;
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
namespace MxGateway.Server.Diagnostics;
|
||||
|
||||
public sealed record GatewayLogScope(
|
||||
string? SessionId = null,
|
||||
int? WorkerProcessId = null,
|
||||
ulong? CorrelationId = null,
|
||||
string? CommandMethod = null,
|
||||
string? ClientIdentity = null)
|
||||
{
|
||||
public IReadOnlyDictionary<string, object?> ToDictionary()
|
||||
{
|
||||
Dictionary<string, object?> values = [];
|
||||
|
||||
AddIfPresent(values, "SessionId", SessionId);
|
||||
AddIfPresent(values, "WorkerProcessId", WorkerProcessId);
|
||||
AddIfPresent(values, "CorrelationId", CorrelationId);
|
||||
AddIfPresent(values, "CommandMethod", CommandMethod);
|
||||
AddIfPresent(values, "ClientIdentity", GatewayLogRedactor.RedactClientIdentity(ClientIdentity));
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
private static void AddIfPresent(
|
||||
Dictionary<string, object?> values,
|
||||
string key,
|
||||
object? value)
|
||||
{
|
||||
if (value is not null)
|
||||
{
|
||||
values[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace MxGateway.Server.Diagnostics;
|
||||
|
||||
public static class GatewayLoggerExtensions
|
||||
{
|
||||
public static IDisposable? BeginGatewayScope(
|
||||
this ILogger logger,
|
||||
GatewayLogScope scope)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
ArgumentNullException.ThrowIfNull(scope);
|
||||
|
||||
return logger.BeginScope(scope.ToDictionary());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
using Microsoft.Extensions.Primitives;
|
||||
|
||||
namespace MxGateway.Server.Diagnostics;
|
||||
|
||||
public static class GatewayRequestLoggingMiddlewareExtensions
|
||||
{
|
||||
public const string SessionIdHeaderName = "x-session-id";
|
||||
public const string WorkerProcessIdHeaderName = "x-worker-process-id";
|
||||
public const string CorrelationIdHeaderName = "x-correlation-id";
|
||||
public const string CommandMethodHeaderName = "x-command-method";
|
||||
|
||||
public static IApplicationBuilder UseGatewayRequestLoggingScope(this IApplicationBuilder app)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(app);
|
||||
|
||||
return app.Use(async (context, next) =>
|
||||
{
|
||||
ILogger logger = context.RequestServices
|
||||
.GetRequiredService<ILoggerFactory>()
|
||||
.CreateLogger("MxGateway.Request");
|
||||
|
||||
using IDisposable? scope = logger.BeginGatewayScope(new GatewayLogScope(
|
||||
SessionId: ReadHeader(context, SessionIdHeaderName),
|
||||
WorkerProcessId: ReadInt32Header(context, WorkerProcessIdHeaderName),
|
||||
CorrelationId: ReadUInt64Header(context, CorrelationIdHeaderName),
|
||||
CommandMethod: ReadHeader(context, CommandMethodHeaderName),
|
||||
ClientIdentity: ReadHeader(context, "authorization")));
|
||||
|
||||
await next(context);
|
||||
});
|
||||
}
|
||||
|
||||
private static string? ReadHeader(HttpContext context, string headerName)
|
||||
{
|
||||
return context.Request.Headers.TryGetValue(headerName, out StringValues values)
|
||||
? values.ToString()
|
||||
: null;
|
||||
}
|
||||
|
||||
private static int? ReadInt32Header(HttpContext context, string headerName)
|
||||
{
|
||||
string? value = ReadHeader(context, headerName);
|
||||
|
||||
return int.TryParse(value, out int parsedValue)
|
||||
? parsedValue
|
||||
: null;
|
||||
}
|
||||
|
||||
private static ulong? ReadUInt64Header(HttpContext context, string headerName)
|
||||
{
|
||||
string? value = ReadHeader(context, headerName);
|
||||
|
||||
return ulong.TryParse(value, out ulong parsedValue)
|
||||
? parsedValue
|
||||
: null;
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,7 @@
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Diagnostics;
|
||||
using MxGateway.Server.Metrics;
|
||||
|
||||
namespace MxGateway.Server;
|
||||
|
||||
@@ -9,6 +12,7 @@ public static class GatewayApplication
|
||||
WebApplicationBuilder builder = CreateBuilder(args);
|
||||
WebApplication app = builder.Build();
|
||||
|
||||
app.UseGatewayRequestLoggingScope();
|
||||
app.MapGatewayEndpoints();
|
||||
|
||||
return app;
|
||||
@@ -18,7 +22,9 @@ public static class GatewayApplication
|
||||
{
|
||||
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
builder.Services.AddGatewayConfiguration();
|
||||
builder.Services.AddHealthChecks();
|
||||
builder.Services.AddSingleton<GatewayMetrics>();
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,306 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace MxGateway.Server.Metrics;
|
||||
|
||||
public sealed class GatewayMetrics : IDisposable
|
||||
{
|
||||
public const string MeterName = "MxGateway.Server";
|
||||
|
||||
private readonly object _syncRoot = new();
|
||||
private readonly Meter _meter;
|
||||
private readonly Counter<long> _sessionsOpenedCounter;
|
||||
private readonly Counter<long> _sessionsClosedCounter;
|
||||
private readonly Counter<long> _commandsStartedCounter;
|
||||
private readonly Counter<long> _commandsSucceededCounter;
|
||||
private readonly Counter<long> _commandsFailedCounter;
|
||||
private readonly Counter<long> _eventsReceivedCounter;
|
||||
private readonly Counter<long> _queueOverflowsCounter;
|
||||
private readonly Counter<long> _faultsCounter;
|
||||
private readonly Counter<long> _workerKillsCounter;
|
||||
private readonly Counter<long> _workerExitsCounter;
|
||||
private readonly Counter<long> _heartbeatFailuresCounter;
|
||||
private readonly Counter<long> _streamDisconnectsCounter;
|
||||
private readonly Histogram<double> _workerStartupLatencyHistogram;
|
||||
private readonly Histogram<double> _commandLatencyHistogram;
|
||||
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
|
||||
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
private int _openSessions;
|
||||
private int _workersRunning;
|
||||
private int _eventQueueDepth;
|
||||
private long _sessionsOpened;
|
||||
private long _sessionsClosed;
|
||||
private long _commandsStarted;
|
||||
private long _commandsSucceeded;
|
||||
private long _commandsFailed;
|
||||
private long _eventsReceived;
|
||||
private long _queueOverflows;
|
||||
private long _faults;
|
||||
private long _workerKills;
|
||||
private long _workerExits;
|
||||
private long _heartbeatFailures;
|
||||
private long _streamDisconnects;
|
||||
private bool _disposed;
|
||||
|
||||
public GatewayMetrics()
|
||||
{
|
||||
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
|
||||
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
|
||||
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
|
||||
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
|
||||
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
|
||||
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
|
||||
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
|
||||
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
|
||||
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
|
||||
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
|
||||
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
|
||||
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
|
||||
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
|
||||
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
|
||||
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
|
||||
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
|
||||
|
||||
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
|
||||
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
|
||||
_meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth);
|
||||
}
|
||||
|
||||
public void SessionOpened()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_openSessions++;
|
||||
_sessionsOpened++;
|
||||
}
|
||||
|
||||
_sessionsOpenedCounter.Add(1);
|
||||
}
|
||||
|
||||
public void SessionClosed()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_openSessions > 0)
|
||||
{
|
||||
_openSessions--;
|
||||
}
|
||||
|
||||
_sessionsClosed++;
|
||||
}
|
||||
|
||||
_sessionsClosedCounter.Add(1);
|
||||
}
|
||||
|
||||
public void WorkerStarted(TimeSpan startupDuration)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_workersRunning++;
|
||||
}
|
||||
|
||||
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
|
||||
}
|
||||
|
||||
public void WorkerStopped(string reason)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_workersRunning > 0)
|
||||
{
|
||||
_workersRunning--;
|
||||
}
|
||||
|
||||
_workerExits++;
|
||||
}
|
||||
|
||||
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||
}
|
||||
|
||||
public void WorkerKilled(string reason)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_workerKills++;
|
||||
}
|
||||
|
||||
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||
}
|
||||
|
||||
public void CommandStarted(string method)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_commandsStarted++;
|
||||
}
|
||||
|
||||
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
|
||||
}
|
||||
|
||||
public void CommandSucceeded(string method, TimeSpan duration)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_commandsSucceeded++;
|
||||
}
|
||||
|
||||
KeyValuePair<string, object?> methodTag = new("method", method);
|
||||
_commandsSucceededCounter.Add(1, methodTag);
|
||||
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
|
||||
}
|
||||
|
||||
public void CommandFailed(string method, string category, TimeSpan duration)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_commandsFailed++;
|
||||
Increment(_commandFailuresByMethod, method);
|
||||
}
|
||||
|
||||
KeyValuePair<string, object?> methodTag = new("method", method);
|
||||
KeyValuePair<string, object?> categoryTag = new("category", category);
|
||||
_commandsFailedCounter.Add(1, methodTag, categoryTag);
|
||||
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
|
||||
}
|
||||
|
||||
public void EventReceived(string sessionId, string family)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_eventsReceived++;
|
||||
Increment(_eventsByFamily, family);
|
||||
}
|
||||
|
||||
_eventsReceivedCounter.Add(
|
||||
1,
|
||||
new KeyValuePair<string, object?>("session_id", sessionId),
|
||||
new KeyValuePair<string, object?>("family", family));
|
||||
}
|
||||
|
||||
public void RecordEventStreamSend(string family, TimeSpan duration)
|
||||
{
|
||||
_eventStreamSendLatencyHistogram.Record(
|
||||
duration.TotalMilliseconds,
|
||||
new KeyValuePair<string, object?>("family", family));
|
||||
}
|
||||
|
||||
public void SetEventQueueDepth(int depth)
|
||||
{
|
||||
if (depth < 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
|
||||
}
|
||||
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_eventQueueDepth = depth;
|
||||
}
|
||||
}
|
||||
|
||||
public void QueueOverflow(string queueName)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_queueOverflows++;
|
||||
}
|
||||
|
||||
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
|
||||
}
|
||||
|
||||
public void Fault(string category)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_faults++;
|
||||
}
|
||||
|
||||
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
|
||||
}
|
||||
|
||||
public void HeartbeatFailed(string sessionId)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_heartbeatFailures++;
|
||||
}
|
||||
|
||||
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
|
||||
}
|
||||
|
||||
public void StreamDisconnected(string reason)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_streamDisconnects++;
|
||||
}
|
||||
|
||||
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||
}
|
||||
|
||||
public GatewayMetricsSnapshot GetSnapshot()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
return new GatewayMetricsSnapshot(
|
||||
OpenSessions: _openSessions,
|
||||
WorkersRunning: _workersRunning,
|
||||
EventQueueDepth: _eventQueueDepth,
|
||||
SessionsOpened: _sessionsOpened,
|
||||
SessionsClosed: _sessionsClosed,
|
||||
CommandsStarted: _commandsStarted,
|
||||
CommandsSucceeded: _commandsSucceeded,
|
||||
CommandsFailed: _commandsFailed,
|
||||
EventsReceived: _eventsReceived,
|
||||
QueueOverflows: _queueOverflows,
|
||||
Faults: _faults,
|
||||
WorkerKills: _workerKills,
|
||||
WorkerExits: _workerExits,
|
||||
HeartbeatFailures: _heartbeatFailures,
|
||||
StreamDisconnects: _streamDisconnects,
|
||||
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
|
||||
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_meter.Dispose();
|
||||
_disposed = true;
|
||||
}
|
||||
|
||||
private int GetOpenSessions()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
return _openSessions;
|
||||
}
|
||||
}
|
||||
|
||||
private int GetWorkersRunning()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
return _workersRunning;
|
||||
}
|
||||
}
|
||||
|
||||
private int GetEventQueueDepth()
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
return _eventQueueDepth;
|
||||
}
|
||||
}
|
||||
|
||||
private static void Increment(Dictionary<string, long> values, string key)
|
||||
{
|
||||
values.TryGetValue(key, out long currentValue);
|
||||
values[key] = currentValue + 1;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
namespace MxGateway.Server.Metrics;
|
||||
|
||||
public sealed record GatewayMetricsSnapshot(
|
||||
int OpenSessions,
|
||||
int WorkersRunning,
|
||||
int EventQueueDepth,
|
||||
long SessionsOpened,
|
||||
long SessionsClosed,
|
||||
long CommandsStarted,
|
||||
long CommandsSucceeded,
|
||||
long CommandsFailed,
|
||||
long EventsReceived,
|
||||
long QueueOverflows,
|
||||
long Faults,
|
||||
long WorkerKills,
|
||||
long WorkerExits,
|
||||
long HeartbeatFailures,
|
||||
long StreamDisconnects,
|
||||
IReadOnlyDictionary<string, long> CommandFailuresByMethod,
|
||||
IReadOnlyDictionary<string, long> EventsByFamily);
|
||||
@@ -0,0 +1,32 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
internal static class WorkerEnvelopeValidator
|
||||
{
|
||||
public static void Validate(
|
||||
WorkerEnvelope envelope,
|
||||
WorkerFrameProtocolOptions options)
|
||||
{
|
||||
if (envelope.ProtocolVersion != options.ProtocolVersion)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
|
||||
}
|
||||
|
||||
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.SessionMismatch,
|
||||
"Worker envelope session id does not match the owning gateway session.");
|
||||
}
|
||||
|
||||
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||
"Worker envelope must include a typed body.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
public enum WorkerFrameProtocolErrorCode
|
||||
{
|
||||
Unknown = 0,
|
||||
InvalidConfiguration = 1,
|
||||
EndOfStream = 2,
|
||||
MalformedLength = 3,
|
||||
MessageTooLarge = 4,
|
||||
InvalidEnvelope = 5,
|
||||
ProtocolVersionMismatch = 6,
|
||||
SessionMismatch = 7,
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
public sealed class WorkerFrameProtocolException : Exception
|
||||
{
|
||||
public WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode errorCode,
|
||||
string message)
|
||||
: base(message)
|
||||
{
|
||||
ErrorCode = errorCode;
|
||||
}
|
||||
|
||||
public WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode errorCode,
|
||||
string message,
|
||||
Exception innerException)
|
||||
: base(message, innerException)
|
||||
{
|
||||
ErrorCode = errorCode;
|
||||
}
|
||||
|
||||
public WorkerFrameProtocolErrorCode ErrorCode { get; }
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
public sealed class WorkerFrameProtocolOptions
|
||||
{
|
||||
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
|
||||
|
||||
public WorkerFrameProtocolOptions(string sessionId)
|
||||
: this(
|
||||
sessionId,
|
||||
GatewayContractInfo.WorkerProtocolVersion,
|
||||
DefaultMaxMessageBytes)
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerFrameProtocolOptions(
|
||||
string sessionId,
|
||||
uint protocolVersion,
|
||||
int maxMessageBytes)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||
"Worker frame protocol requires a session id.");
|
||||
}
|
||||
|
||||
if (protocolVersion == 0)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||
"Worker frame protocol requires a non-zero protocol version.");
|
||||
}
|
||||
|
||||
if (maxMessageBytes <= 0)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||
"Worker frame protocol max message size must be greater than zero.");
|
||||
}
|
||||
|
||||
SessionId = sessionId;
|
||||
ProtocolVersion = protocolVersion;
|
||||
MaxMessageBytes = maxMessageBytes;
|
||||
}
|
||||
|
||||
public string SessionId { get; }
|
||||
|
||||
public uint ProtocolVersion { get; }
|
||||
|
||||
public int MaxMessageBytes { get; }
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
using System.Buffers.Binary;
|
||||
using Google.Protobuf;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
public sealed class WorkerFrameReader
|
||||
{
|
||||
private readonly WorkerFrameProtocolOptions _options;
|
||||
private readonly Stream _stream;
|
||||
|
||||
public WorkerFrameReader(
|
||||
Stream stream,
|
||||
WorkerFrameProtocolOptions options)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
|
||||
if (payloadLength == 0)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.MalformedLength,
|
||||
"Worker frame payload length must be greater than zero.");
|
||||
}
|
||||
|
||||
if (payloadLength > _options.MaxMessageBytes)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||
}
|
||||
|
||||
byte[] payload = new byte[payloadLength];
|
||||
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
WorkerEnvelope envelope;
|
||||
try
|
||||
{
|
||||
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
|
||||
}
|
||||
catch (InvalidProtocolBufferException exception)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
|
||||
exception);
|
||||
}
|
||||
|
||||
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||
|
||||
return envelope;
|
||||
}
|
||||
|
||||
private async ValueTask ReadExactlyOrThrowAsync(
|
||||
Memory<byte> buffer,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (EndOfStreamException exception)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.EndOfStream,
|
||||
"Worker frame ended before the expected number of bytes were read.",
|
||||
exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
using System.Buffers.Binary;
|
||||
using Google.Protobuf;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Workers;
|
||||
|
||||
public sealed class WorkerFrameWriter
|
||||
{
|
||||
private readonly WorkerFrameProtocolOptions _options;
|
||||
private readonly Stream _stream;
|
||||
|
||||
public WorkerFrameWriter(
|
||||
Stream stream,
|
||||
WorkerFrameProtocolOptions options)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
public async ValueTask WriteAsync(
|
||||
WorkerEnvelope envelope,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(envelope);
|
||||
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||
|
||||
int payloadLength = envelope.CalculateSize();
|
||||
if (payloadLength == 0)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||
"Worker envelope cannot serialize to an empty payload.");
|
||||
}
|
||||
|
||||
if (payloadLength > _options.MaxMessageBytes)
|
||||
{
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||
}
|
||||
|
||||
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
|
||||
|
||||
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
@@ -5,5 +5,44 @@
|
||||
"Microsoft.AspNetCore": "Warning"
|
||||
}
|
||||
},
|
||||
"AllowedHosts": "*"
|
||||
"AllowedHosts": "*",
|
||||
"MxGateway": {
|
||||
"Authentication": {
|
||||
"Mode": "ApiKey",
|
||||
"SqlitePath": "C:\\ProgramData\\MxGateway\\gateway-auth.db",
|
||||
"PepperSecretName": "MxGateway:ApiKeyPepper",
|
||||
"RunMigrationsOnStartup": true
|
||||
},
|
||||
"Worker": {
|
||||
"ExecutablePath": "src\\MxGateway.Worker\\bin\\x86\\Release\\MxGateway.Worker.exe",
|
||||
"RequiredArchitecture": "X86",
|
||||
"StartupTimeoutSeconds": 30,
|
||||
"ShutdownTimeoutSeconds": 10,
|
||||
"HeartbeatIntervalSeconds": 5,
|
||||
"HeartbeatGraceSeconds": 15,
|
||||
"MaxMessageBytes": 16777216
|
||||
},
|
||||
"Sessions": {
|
||||
"DefaultCommandTimeoutSeconds": 30,
|
||||
"MaxSessions": 64,
|
||||
"AllowMultipleEventSubscribers": false
|
||||
},
|
||||
"Events": {
|
||||
"QueueCapacity": 10000,
|
||||
"BackpressurePolicy": "FailFast"
|
||||
},
|
||||
"Dashboard": {
|
||||
"Enabled": true,
|
||||
"PathBase": "/dashboard",
|
||||
"RequireAdminScope": true,
|
||||
"AllowAnonymousLocalhost": false,
|
||||
"SnapshotIntervalMilliseconds": 1000,
|
||||
"RecentFaultLimit": 100,
|
||||
"RecentSessionLimit": 200,
|
||||
"ShowTagValues": false
|
||||
},
|
||||
"Protocol": {
|
||||
"WorkerProtocolVersion": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Server.Configuration;
|
||||
|
||||
namespace MxGateway.Tests.Configuration;
|
||||
|
||||
public sealed class GatewayOptionsTests
|
||||
{
|
||||
[Fact]
|
||||
public void OptionsBinding_UsesDesignDefaults()
|
||||
{
|
||||
GatewayOptions options = BindOptions(new Dictionary<string, string?>());
|
||||
|
||||
Assert.Equal(AuthenticationMode.ApiKey, options.Authentication.Mode);
|
||||
Assert.Equal(@"C:\ProgramData\MxGateway\gateway-auth.db", options.Authentication.SqlitePath);
|
||||
Assert.Equal("MxGateway:ApiKeyPepper", options.Authentication.PepperSecretName);
|
||||
Assert.True(options.Authentication.RunMigrationsOnStartup);
|
||||
|
||||
Assert.Equal(@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe", options.Worker.ExecutablePath);
|
||||
Assert.Equal(WorkerArchitecture.X86, options.Worker.RequiredArchitecture);
|
||||
Assert.Equal(30, options.Worker.StartupTimeoutSeconds);
|
||||
Assert.Equal(10, options.Worker.ShutdownTimeoutSeconds);
|
||||
Assert.Equal(5, options.Worker.HeartbeatIntervalSeconds);
|
||||
Assert.Equal(15, options.Worker.HeartbeatGraceSeconds);
|
||||
Assert.Equal(16 * 1024 * 1024, options.Worker.MaxMessageBytes);
|
||||
|
||||
Assert.Equal(30, options.Sessions.DefaultCommandTimeoutSeconds);
|
||||
Assert.Equal(64, options.Sessions.MaxSessions);
|
||||
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
|
||||
|
||||
Assert.Equal(10_000, options.Events.QueueCapacity);
|
||||
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
|
||||
|
||||
Assert.True(options.Dashboard.Enabled);
|
||||
Assert.Equal("/dashboard", options.Dashboard.PathBase);
|
||||
Assert.True(options.Dashboard.RequireAdminScope);
|
||||
Assert.False(options.Dashboard.AllowAnonymousLocalhost);
|
||||
Assert.Equal(1_000, options.Dashboard.SnapshotIntervalMilliseconds);
|
||||
Assert.Equal(100, options.Dashboard.RecentFaultLimit);
|
||||
Assert.Equal(200, options.Dashboard.RecentSessionLimit);
|
||||
Assert.False(options.Dashboard.ShowTagValues);
|
||||
|
||||
Assert.Equal(1u, options.Protocol.WorkerProtocolVersion);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OptionsBinding_AppliesConfigurationOverrides()
|
||||
{
|
||||
GatewayOptions options = BindOptions(
|
||||
new Dictionary<string, string?>
|
||||
{
|
||||
["MxGateway:Authentication:Mode"] = "Disabled",
|
||||
["MxGateway:Worker:ExecutablePath"] = @"C:\Gateway\MxGateway.Worker.exe",
|
||||
["MxGateway:Sessions:MaxSessions"] = "12",
|
||||
["MxGateway:Events:QueueCapacity"] = "256",
|
||||
["MxGateway:Dashboard:Enabled"] = "false"
|
||||
});
|
||||
|
||||
Assert.Equal(AuthenticationMode.Disabled, options.Authentication.Mode);
|
||||
Assert.Equal(@"C:\Gateway\MxGateway.Worker.exe", options.Worker.ExecutablePath);
|
||||
Assert.Equal(12, options.Sessions.MaxSessions);
|
||||
Assert.Equal(256, options.Events.QueueCapacity);
|
||||
Assert.False(options.Dashboard.Enabled);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("MxGateway:Worker:ExecutablePath", "worker.dll", "MxGateway:Worker:ExecutablePath must point to a .exe file.")]
|
||||
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
|
||||
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
|
||||
[InlineData("MxGateway:Dashboard:PathBase", "dashboard", "MxGateway:Dashboard:PathBase must start with '/'.")]
|
||||
public void Validation_InvalidConfiguration_FailsClearly(string key, string value, string expectedFailure)
|
||||
{
|
||||
OptionsValidationException exception = Assert.Throws<OptionsValidationException>(() =>
|
||||
_ = BindOptions(new Dictionary<string, string?> { [key] = value }));
|
||||
|
||||
Assert.Contains(exception.Failures, failure => failure.Contains(expectedFailure, StringComparison.Ordinal));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void EffectiveConfiguration_RedactsPepperSecretName()
|
||||
{
|
||||
using ServiceProvider services = BuildServices(
|
||||
new Dictionary<string, string?>
|
||||
{
|
||||
["MxGateway:Authentication:PepperSecretName"] = "RawPepperSecretName"
|
||||
});
|
||||
|
||||
IGatewayConfigurationProvider provider = services.GetRequiredService<IGatewayConfigurationProvider>();
|
||||
|
||||
EffectiveGatewayConfiguration configuration = provider.GetEffectiveConfiguration();
|
||||
|
||||
Assert.Equal(GatewayConfigurationProvider.RedactedValue, configuration.Authentication.PepperSecretName);
|
||||
Assert.DoesNotContain(
|
||||
"RawPepperSecretName",
|
||||
System.Text.Json.JsonSerializer.Serialize(configuration),
|
||||
StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
private static GatewayOptions BindOptions(IReadOnlyDictionary<string, string?> configurationValues)
|
||||
{
|
||||
using ServiceProvider services = BuildServices(configurationValues);
|
||||
|
||||
return services.GetRequiredService<IOptions<GatewayOptions>>().Value;
|
||||
}
|
||||
|
||||
private static ServiceProvider BuildServices(IReadOnlyDictionary<string, string?> configurationValues)
|
||||
{
|
||||
IConfigurationRoot configuration = new ConfigurationBuilder()
|
||||
.AddInMemoryCollection(configurationValues)
|
||||
.Build();
|
||||
|
||||
ServiceCollection services = new();
|
||||
services.AddSingleton<IConfiguration>(configuration);
|
||||
services.AddGatewayConfiguration();
|
||||
|
||||
return services.BuildServiceProvider(validateScopes: true);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
using Google.Protobuf;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Tests.Contracts;
|
||||
|
||||
public sealed class ProtobufContractRoundTripTests
|
||||
{
|
||||
[Fact]
|
||||
public void GatewayDescriptor_ContainsInitialPublicServiceMethods()
|
||||
{
|
||||
var service = Assert.Single(
|
||||
MxaccessGatewayReflection.Descriptor.Services,
|
||||
descriptor => descriptor.Name == "MxAccessGateway");
|
||||
|
||||
Assert.Contains(service.Methods, method => method.Name == "OpenSession");
|
||||
Assert.Contains(service.Methods, method => method.Name == "CloseSession");
|
||||
Assert.Contains(service.Methods, method => method.Name == "Invoke");
|
||||
Assert.Contains(service.Methods, method => method.Name == "StreamEvents");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void WorkerEnvelopeDescriptor_ContainsRequiredCorrelationFields()
|
||||
{
|
||||
var fields = WorkerEnvelope.Descriptor.Fields.InDeclarationOrder();
|
||||
|
||||
Assert.Contains(fields, field => field.Name == "protocol_version");
|
||||
Assert.Contains(fields, field => field.Name == "session_id");
|
||||
Assert.Contains(fields, field => field.Name == "sequence");
|
||||
Assert.Contains(fields, field => field.Name == "correlation_id");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CommandRequest_RoundTripsMethodSpecificPayload()
|
||||
{
|
||||
var original = new MxCommandRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "client-correlation-1",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand
|
||||
{
|
||||
ClientName = "mxaccessgw-test-client",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
var parsed = MxCommandRequest.Parser.ParseFrom(original.ToByteArray());
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
Assert.Equal(MxCommand.PayloadOneofCase.Register, parsed.Command.PayloadCase);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CommandReply_RoundTripsHResultReturnValueOutParamsAndStatuses()
|
||||
{
|
||||
var original = new MxCommandReply
|
||||
{
|
||||
SessionId = "session-1",
|
||||
CorrelationId = "gateway-correlation-1",
|
||||
Kind = MxCommandKind.AddItem,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
},
|
||||
Hresult = 0,
|
||||
ReturnValue = new MxValue
|
||||
{
|
||||
DataType = MxDataType.Integer,
|
||||
Int32Value = 1234,
|
||||
VariantType = "VT_I4",
|
||||
},
|
||||
AddItem = new AddItemReply
|
||||
{
|
||||
ItemHandle = 1234,
|
||||
},
|
||||
};
|
||||
original.Statuses.Add(new MxStatusProxy
|
||||
{
|
||||
Success = 1,
|
||||
Category = MxStatusCategory.Ok,
|
||||
DetectedBy = MxStatusSource.RespondingLmx,
|
||||
Detail = 0,
|
||||
});
|
||||
|
||||
var parsed = MxCommandReply.Parser.ParseFrom(original.ToByteArray());
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
Assert.True(parsed.HasHresult);
|
||||
Assert.Equal(MxCommandReply.PayloadOneofCase.AddItem, parsed.PayloadCase);
|
||||
Assert.Single(parsed.Statuses);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Event_RoundTripsValueStatusSequenceAndBufferedBody()
|
||||
{
|
||||
var timestamp = Timestamp.FromDateTime(new DateTime(2026, 4, 26, 20, 0, 0, DateTimeKind.Utc));
|
||||
var original = new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnBufferedDataChange,
|
||||
SessionId = "session-1",
|
||||
ServerHandle = 10,
|
||||
ItemHandle = 20,
|
||||
Value = new MxValue
|
||||
{
|
||||
DataType = MxDataType.Float,
|
||||
ArrayValue = new MxArray
|
||||
{
|
||||
ElementDataType = MxDataType.Float,
|
||||
FloatValues = new FloatArray
|
||||
{
|
||||
Values = { 1.5f, 2.5f },
|
||||
},
|
||||
Dimensions = { 2 },
|
||||
VariantType = "VT_ARRAY|VT_R4",
|
||||
},
|
||||
},
|
||||
Quality = 192,
|
||||
SourceTimestamp = timestamp,
|
||||
WorkerSequence = 42,
|
||||
WorkerTimestamp = timestamp,
|
||||
GatewayReceiveTimestamp = timestamp,
|
||||
OnBufferedDataChange = new OnBufferedDataChangeEvent
|
||||
{
|
||||
DataType = MxDataType.Float,
|
||||
QualityValues = new MxArray
|
||||
{
|
||||
ElementDataType = MxDataType.Integer,
|
||||
Int32Values = new Int32Array
|
||||
{
|
||||
Values = { 192, 192 },
|
||||
},
|
||||
Dimensions = { 2 },
|
||||
},
|
||||
TimestampValues = new MxArray
|
||||
{
|
||||
ElementDataType = MxDataType.Time,
|
||||
TimestampValues = new TimestampArray
|
||||
{
|
||||
Values = { timestamp, timestamp },
|
||||
},
|
||||
Dimensions = { 2 },
|
||||
},
|
||||
},
|
||||
};
|
||||
original.Statuses.Add(new MxStatusProxy
|
||||
{
|
||||
Success = 1,
|
||||
Category = MxStatusCategory.Ok,
|
||||
DetectedBy = MxStatusSource.RespondingNmx,
|
||||
Detail = 0,
|
||||
});
|
||||
|
||||
var parsed = MxEvent.Parser.ParseFrom(original.ToByteArray());
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
Assert.Equal(MxEvent.BodyOneofCase.OnBufferedDataChange, parsed.BodyCase);
|
||||
Assert.Single(parsed.Statuses);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void WorkerEnvelope_RoundTripsProtocolFieldsAndCommandBody()
|
||||
{
|
||||
var original = new WorkerEnvelope
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
SessionId = "session-1",
|
||||
Sequence = 7,
|
||||
CorrelationId = "gateway-correlation-1",
|
||||
WorkerCommand = new WorkerCommand
|
||||
{
|
||||
EnqueueTimestamp = Timestamp.FromDateTime(
|
||||
new DateTime(2026, 4, 26, 20, 5, 0, DateTimeKind.Utc)),
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Advise,
|
||||
Advise = new AdviseCommand
|
||||
{
|
||||
ServerHandle = 10,
|
||||
ItemHandle = 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
var parsed = WorkerEnvelope.Parser.ParseFrom(original.ToByteArray());
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, parsed.BodyCase);
|
||||
Assert.Equal(MxCommand.PayloadOneofCase.Advise, parsed.WorkerCommand.Command.PayloadCase);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
using MxGateway.Server.Diagnostics;
|
||||
|
||||
namespace MxGateway.Tests.Diagnostics;
|
||||
|
||||
public sealed class GatewayLogRedactorTests
|
||||
{
|
||||
[Fact]
|
||||
public void RedactApiKey_PreservesKeyIdAndRemovesSecret()
|
||||
{
|
||||
string? redacted = GatewayLogRedactor.RedactApiKey("Bearer mxgw_operator01_super-secret");
|
||||
|
||||
Assert.Equal("Bearer mxgw_operator01_[redacted]", redacted);
|
||||
Assert.DoesNotContain("super-secret", redacted);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("AuthenticateUser")]
|
||||
[InlineData("WriteSecured")]
|
||||
[InlineData("WriteSecured2")]
|
||||
public void IsCredentialBearingCommand_IdentifiesSensitiveMxAccessCommands(string commandMethod)
|
||||
{
|
||||
Assert.True(GatewayLogRedactor.IsCredentialBearingCommand(commandMethod));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RedactCommandValue_DoesNotLogRawValuesByDefault()
|
||||
{
|
||||
object? redacted = GatewayLogRedactor.RedactCommandValue("Write", "plaintext-tag-value");
|
||||
|
||||
Assert.Equal("[redacted]", redacted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RedactCommandValue_RedactsSecuredWriteEvenWhenValueLoggingIsEnabled()
|
||||
{
|
||||
object? redacted = GatewayLogRedactor.RedactCommandValue(
|
||||
"WriteSecured",
|
||||
"credential-bearing-value",
|
||||
valueLoggingEnabled: true);
|
||||
|
||||
Assert.Equal("[redacted]", redacted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RedactCommandValue_AllowsNonSensitiveValueOnlyWhenValueLoggingIsEnabled()
|
||||
{
|
||||
object? redacted = GatewayLogRedactor.RedactCommandValue(
|
||||
"Write",
|
||||
"diagnostic-value",
|
||||
valueLoggingEnabled: true);
|
||||
|
||||
Assert.Equal("diagnostic-value", redacted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogScope_RedactsClientIdentityBeforeScopeStateIsCreated()
|
||||
{
|
||||
GatewayLogScope scope = new(
|
||||
SessionId: "session-1",
|
||||
WorkerProcessId: 1234,
|
||||
CorrelationId: 99,
|
||||
CommandMethod: "AuthenticateUser",
|
||||
ClientIdentity: "Bearer mxgw_admin_secret");
|
||||
|
||||
IReadOnlyDictionary<string, object?> values = scope.ToDictionary();
|
||||
|
||||
Assert.Equal("session-1", values["SessionId"]);
|
||||
Assert.Equal(1234, values["WorkerProcessId"]);
|
||||
Assert.Equal((ulong)99, values["CorrelationId"]);
|
||||
Assert.Equal("AuthenticateUser", values["CommandMethod"]);
|
||||
Assert.Equal("Bearer mxgw_admin_[redacted]", values["ClientIdentity"]);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Server;
|
||||
using MxGateway.Server.Metrics;
|
||||
|
||||
namespace MxGateway.Tests.Gateway;
|
||||
|
||||
@@ -19,4 +22,47 @@ public sealed class GatewayApplicationTests
|
||||
|
||||
Assert.Equal("LiveHealth", endpoint.Metadata.GetMetadata<IEndpointNameMetadata>()?.EndpointName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Build_RegistersGatewayMetrics()
|
||||
{
|
||||
WebApplication app = GatewayApplication.Build([]);
|
||||
|
||||
GatewayMetrics metrics = app.Services.GetRequiredService<GatewayMetrics>();
|
||||
|
||||
Assert.NotNull(metrics);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(
|
||||
"MxGateway:Worker:ExecutablePath",
|
||||
"worker.dll",
|
||||
"MxGateway:Worker:ExecutablePath must point to a .exe file.")]
|
||||
[InlineData(
|
||||
"MxGateway:Events:QueueCapacity",
|
||||
"0",
|
||||
"MxGateway:Events:QueueCapacity must be greater than zero.")]
|
||||
[InlineData(
|
||||
"MxGateway:Authentication:PepperSecretName",
|
||||
"",
|
||||
"MxGateway:Authentication:PepperSecretName is required")]
|
||||
[InlineData(
|
||||
"MxGateway:Dashboard:PathBase",
|
||||
"dashboard",
|
||||
"MxGateway:Dashboard:PathBase must start with '/'.")]
|
||||
public async Task StartAsync_InvalidGatewayConfiguration_FailsStartup(
|
||||
string key,
|
||||
string value,
|
||||
string expectedFailure)
|
||||
{
|
||||
await using WebApplication app = GatewayApplication.Build(
|
||||
[$"--{key}={value}", "--urls=http://127.0.0.1:0"]);
|
||||
|
||||
OptionsValidationException exception = await Assert.ThrowsAsync<OptionsValidationException>(
|
||||
() => app.StartAsync());
|
||||
|
||||
Assert.Contains(
|
||||
exception.Failures,
|
||||
failure => failure.Contains(expectedFailure, StringComparison.Ordinal));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
using System.Buffers.Binary;
|
||||
using Google.Protobuf;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Workers;
|
||||
|
||||
public sealed class WorkerFrameProtocolTests
|
||||
{
|
||||
private const string SessionId = "session-1";
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
await using MemoryStream stream = new();
|
||||
WorkerEnvelope original = CreateEnvelope();
|
||||
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
await writer.WriteAsync(original);
|
||||
stream.Position = 0;
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
WorkerEnvelope original = CreateEnvelope();
|
||||
byte[] frame = CreateFrame(original);
|
||||
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||
|
||||
Assert.Equal(original, parsed);
|
||||
Assert.True(stream.ReadCallCount > 2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
await using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
|
||||
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
|
||||
await using MemoryStream stream = new(lengthPrefix);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
WorkerEnvelope envelope = CreateEnvelope();
|
||||
envelope.ProtocolVersion++;
|
||||
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
WorkerEnvelope envelope = CreateEnvelope();
|
||||
envelope.SessionId = "different-session";
|
||||
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
byte[] frame = CreateFrame([0x80]);
|
||||
await using MemoryStream stream = new(frame);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId);
|
||||
WorkerEnvelope envelope = CreateEnvelope();
|
||||
envelope.ClearBody();
|
||||
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await reader.ReadAsync());
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
|
||||
await using MemoryStream stream = new();
|
||||
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||
async () => await writer.WriteAsync(CreateEnvelope()));
|
||||
|
||||
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||
Assert.Equal(0, stream.Length);
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateEnvelope()
|
||||
{
|
||||
return new WorkerEnvelope
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
SessionId = SessionId,
|
||||
Sequence = 1,
|
||||
CorrelationId = "correlation-1",
|
||||
WorkerHello = new WorkerHello
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
Nonce = "nonce",
|
||||
WorkerProcessId = 1234,
|
||||
WorkerVersion = "test-worker",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static byte[] CreateFrame(IMessage message)
|
||||
{
|
||||
return CreateFrame(message.ToByteArray());
|
||||
}
|
||||
|
||||
private static byte[] CreateFrame(byte[] payload)
|
||||
{
|
||||
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
|
||||
payload.CopyTo(frame.AsSpan(sizeof(uint)));
|
||||
|
||||
return frame;
|
||||
}
|
||||
|
||||
private sealed class ChunkedReadStream : MemoryStream
|
||||
{
|
||||
private readonly int _chunkSize;
|
||||
|
||||
public ChunkedReadStream(
|
||||
byte[] buffer,
|
||||
int chunkSize)
|
||||
: base(buffer)
|
||||
{
|
||||
_chunkSize = chunkSize;
|
||||
}
|
||||
|
||||
public int ReadCallCount { get; private set; }
|
||||
|
||||
public override ValueTask<int> ReadAsync(
|
||||
Memory<byte> buffer,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ReadCallCount++;
|
||||
int requestedCount = Math.Min(buffer.Length, _chunkSize);
|
||||
|
||||
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
using MxGateway.Server.Metrics;
|
||||
|
||||
namespace MxGateway.Tests.Metrics;
|
||||
|
||||
public sealed class GatewayMetricsTests
|
||||
{
|
||||
[Fact]
|
||||
public void GetSnapshot_ReflectsSessionWorkerCommandEventAndFaultUpdates()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
|
||||
metrics.SessionOpened();
|
||||
metrics.WorkerStarted(TimeSpan.FromMilliseconds(250));
|
||||
metrics.CommandStarted("Register");
|
||||
metrics.CommandSucceeded("Register", TimeSpan.FromMilliseconds(10));
|
||||
metrics.CommandStarted("WriteSecured");
|
||||
metrics.CommandFailed("WriteSecured", "AuthorizationFailed", TimeSpan.FromMilliseconds(12));
|
||||
metrics.EventReceived("session-1", "OnDataChange");
|
||||
metrics.EventReceived("session-1", "OnDataChange");
|
||||
metrics.SetEventQueueDepth(7);
|
||||
metrics.QueueOverflow("session-events");
|
||||
metrics.Fault("CommandTimeout");
|
||||
metrics.WorkerKilled("CommandTimeout");
|
||||
metrics.WorkerStopped("Killed");
|
||||
metrics.HeartbeatFailed("session-1");
|
||||
metrics.StreamDisconnected("ClientCancelled");
|
||||
metrics.SessionClosed();
|
||||
|
||||
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
|
||||
|
||||
Assert.Equal(0, snapshot.OpenSessions);
|
||||
Assert.Equal(0, snapshot.WorkersRunning);
|
||||
Assert.Equal(7, snapshot.EventQueueDepth);
|
||||
Assert.Equal(1, snapshot.SessionsOpened);
|
||||
Assert.Equal(1, snapshot.SessionsClosed);
|
||||
Assert.Equal(2, snapshot.CommandsStarted);
|
||||
Assert.Equal(1, snapshot.CommandsSucceeded);
|
||||
Assert.Equal(1, snapshot.CommandsFailed);
|
||||
Assert.Equal(2, snapshot.EventsReceived);
|
||||
Assert.Equal(1, snapshot.QueueOverflows);
|
||||
Assert.Equal(1, snapshot.Faults);
|
||||
Assert.Equal(1, snapshot.WorkerKills);
|
||||
Assert.Equal(1, snapshot.WorkerExits);
|
||||
Assert.Equal(1, snapshot.HeartbeatFailures);
|
||||
Assert.Equal(1, snapshot.StreamDisconnects);
|
||||
Assert.Equal(1, snapshot.CommandFailuresByMethod["WriteSecured"]);
|
||||
Assert.Equal(2, snapshot.EventsByFamily["OnDataChange"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetEventQueueDepth_RejectsNegativeDepth()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
|
||||
ArgumentOutOfRangeException exception = Assert.Throws<ArgumentOutOfRangeException>(
|
||||
() => metrics.SetEventQueueDepth(-1));
|
||||
|
||||
Assert.Equal("depth", exception.ParamName);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user