95b5b09a67
Adds a public WriteArrayElementsAsync helper on MxGatewaySession that builds
an MxValue{SparseArrayValue} and delegates to the existing WriteAsync. Extracts
the MxValue construction into an internal static BuildSparseArray builder for
unit-testability. Two new tests cover builder output shape and the full write
command path. README documents the reset (not preserve) semantics alongside
the existing whole-array guidance.
388 lines
16 KiB
C#
388 lines
16 KiB
C#
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using Grpc.Core;
|
|
|
|
namespace ZB.MOM.WW.MxGateway.Client.Tests;
|
|
|
|
/// <summary>Tests for MxGatewaySession and client command behavior.</summary>
|
|
public sealed class MxGatewayClientSessionTests
|
|
{
|
|
/// <summary>Verifies that open session attaches API key metadata and cancellation token.</summary>
|
|
[Fact]
|
|
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
|
|
{
|
|
using CancellationTokenSource cancellation = new();
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
|
|
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
|
|
|
|
var call = Assert.Single(transport.OpenSessionCalls);
|
|
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
|
|
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
|
|
}
|
|
|
|
/// <summary>Verifies that open session returns a session with the raw open reply.</summary>
|
|
[Fact]
|
|
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.OpenSessionReply.WorkerProcessId = 1234;
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
Assert.Equal("session-fixture", session.SessionId);
|
|
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
|
|
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
|
|
}
|
|
|
|
/// <summary>Verifies that register builds a register command and returns server handle.</summary>
|
|
[Fact]
|
|
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Register,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
Register = new RegisterReply { ServerHandle = 12 },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
int serverHandle = await session.RegisterAsync("fixture-client");
|
|
|
|
Assert.Equal(12, serverHandle);
|
|
var call = Assert.Single(transport.InvokeCalls);
|
|
Assert.Equal("session-fixture", call.Request.SessionId);
|
|
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
|
|
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
|
|
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
|
|
}
|
|
|
|
/// <summary>Verifies that add item 2 builds a command with the specified context.</summary>
|
|
[Fact]
|
|
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.AddItem2,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
|
|
|
|
Assert.Equal(34, itemHandle);
|
|
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
|
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
|
|
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
|
|
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
|
|
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
|
|
}
|
|
|
|
/// <summary>Verifies that write raw builds a write command with the raw value.</summary>
|
|
[Fact]
|
|
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Write,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
MxValue value = new()
|
|
{
|
|
DataType = MxDataType.Integer,
|
|
VariantType = "VT_I4",
|
|
Int32Value = 123,
|
|
};
|
|
|
|
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
|
|
|
|
Assert.Equal(MxCommandKind.Write, reply.Kind);
|
|
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
|
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
|
|
Assert.Equal(12, request.Command.Write.ServerHandle);
|
|
Assert.Equal(34, request.Command.Write.ItemHandle);
|
|
Assert.Same(value, request.Command.Write.Value);
|
|
Assert.Equal(56, request.Command.Write.UserId);
|
|
}
|
|
|
|
/// <summary>Verifies that write 2 raw builds a write 2 command with value and timestamp.</summary>
|
|
[Fact]
|
|
public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Write2,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
MxValue value = 123.ToMxValue();
|
|
MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue();
|
|
|
|
MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56);
|
|
|
|
Assert.Equal(MxCommandKind.Write2, reply.Kind);
|
|
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
|
Assert.Equal(MxCommandKind.Write2, request.Command.Kind);
|
|
Assert.Equal(12, request.Command.Write2.ServerHandle);
|
|
Assert.Equal(34, request.Command.Write2.ItemHandle);
|
|
Assert.Same(value, request.Command.Write2.Value);
|
|
Assert.Same(timestampValue, request.Command.Write2.TimestampValue);
|
|
Assert.Equal(56, request.Command.Write2.UserId);
|
|
}
|
|
|
|
/// <summary>Verifies that subscribe bulk builds one command and returns per-item results.</summary>
|
|
[Fact]
|
|
public async Task SubscribeBulkAsync_BuildsOneBulkCommandAndReturnsPerItemResults()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
SubscribeBulk = new BulkSubscribeReply
|
|
{
|
|
Results =
|
|
{
|
|
new SubscribeResult
|
|
{
|
|
ServerHandle = 12,
|
|
TagAddress = "Area001.Pump001.Speed",
|
|
ItemHandle = 34,
|
|
WasSuccessful = true,
|
|
},
|
|
},
|
|
},
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
IReadOnlyList<SubscribeResult> results = await session.SubscribeBulkAsync(
|
|
12,
|
|
["Area001.Pump001.Speed"]);
|
|
|
|
SubscribeResult result = Assert.Single(results);
|
|
Assert.Equal(34, result.ItemHandle);
|
|
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
|
Assert.Equal(MxCommandKind.SubscribeBulk, request.Command.Kind);
|
|
Assert.Equal(12, request.Command.SubscribeBulk.ServerHandle);
|
|
Assert.Equal(["Area001.Pump001.Speed"], request.Command.SubscribeBulk.TagAddresses);
|
|
}
|
|
|
|
/// <summary>Verifies that stream events yields events in the order received from the gateway.</summary>
|
|
[Fact]
|
|
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddEvent(new MxEvent
|
|
{
|
|
SessionId = "session-fixture",
|
|
Family = MxEventFamily.OnDataChange,
|
|
WorkerSequence = 1,
|
|
});
|
|
transport.AddEvent(new MxEvent
|
|
{
|
|
SessionId = "session-fixture",
|
|
Family = MxEventFamily.OnWriteComplete,
|
|
WorkerSequence = 2,
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
List<ulong> sequences = [];
|
|
|
|
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
|
|
{
|
|
sequences.Add(gatewayEvent.WorkerSequence);
|
|
}
|
|
|
|
Assert.Equal([1UL, 2UL], sequences);
|
|
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
|
|
Assert.Equal("session-fixture", request.SessionId);
|
|
}
|
|
|
|
/// <summary>Verifies that close is explicit and idempotent.</summary>
|
|
[Fact]
|
|
public async Task CloseAsync_IsExplicitAndIdempotent()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
CloseSessionReply first = await session.CloseAsync();
|
|
CloseSessionReply second = await session.CloseAsync();
|
|
|
|
Assert.Same(first, second);
|
|
var call = Assert.Single(transport.CloseSessionCalls);
|
|
Assert.Equal("session-fixture", call.Request.SessionId);
|
|
}
|
|
|
|
/// <summary>Verifies that invoke retries safe diagnostic commands on transient RPC failure.</summary>
|
|
[Fact]
|
|
public async Task InvokeAsync_RetriesSafeDiagnosticCommandOnTransientGrpcFailure()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.InvokeExceptions.Enqueue(CreateTransientRpcException());
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Ping,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
await session.InvokeAsync(new MxCommandRequest
|
|
{
|
|
SessionId = session.SessionId,
|
|
Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() },
|
|
});
|
|
|
|
Assert.Equal(2, transport.InvokeCalls.Count);
|
|
}
|
|
|
|
/// <summary>Verifies that open session does not retry on transient RPC failure.</summary>
|
|
[Fact]
|
|
public async Task OpenSessionAsync_DoesNotRetryTransientGrpcFailure()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.OpenSessionExceptions.Enqueue(CreateTransientRpcException());
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
|
|
await Assert.ThrowsAsync<RpcException>(async () => await client.OpenSessionAsync());
|
|
|
|
Assert.Single(transport.OpenSessionCalls);
|
|
}
|
|
|
|
/// <summary>Verifies that invoke does not retry write commands on transient RPC failure.</summary>
|
|
[Fact]
|
|
public async Task InvokeAsync_DoesNotRetryWriteCommand()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.InvokeExceptions.Enqueue(CreateTransientRpcException());
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
await Assert.ThrowsAsync<RpcException>(async () =>
|
|
await session.WriteRawAsync(1, 2, 3.ToMxValue(), userId: 0));
|
|
|
|
Assert.Single(transport.InvokeCalls);
|
|
}
|
|
|
|
/// <summary>Verifies that invoke helpers pass cancellation token to the transport.</summary>
|
|
[Fact]
|
|
public async Task InvokeHelpers_PassCancellationTokenToTransport()
|
|
{
|
|
using CancellationTokenSource cancellation = new();
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Advise,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
|
|
await session.AdviseAsync(12, 34, cancellation.Token);
|
|
|
|
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
|
|
}
|
|
|
|
/// <summary>Verifies that BuildSparseArray produces a SparseArrayValue MxValue with the correct total length and elements.</summary>
|
|
[Fact]
|
|
public void BuildSparseArray_ProducesSparseArrayValueWithCorrectTotalLengthAndElements()
|
|
{
|
|
MxValue element0 = 42.ToMxValue();
|
|
MxValue element3 = 99.ToMxValue();
|
|
Dictionary<uint, MxValue> elements = new()
|
|
{
|
|
[0u] = element0,
|
|
[3u] = element3,
|
|
};
|
|
|
|
MxValue result = MxGatewaySession.BuildSparseArray(MxDataType.Integer, totalLength: 10, elements);
|
|
|
|
Assert.Equal(MxValue.KindOneofCase.SparseArrayValue, result.KindCase);
|
|
Assert.Equal(10u, result.SparseArrayValue.TotalLength);
|
|
Assert.Equal(MxDataType.Integer, result.SparseArrayValue.ElementDataType);
|
|
Assert.Equal(2, result.SparseArrayValue.Elements.Count);
|
|
|
|
MxSparseElement el0 = Assert.Single(result.SparseArrayValue.Elements, e => e.Index == 0u);
|
|
Assert.Same(element0, el0.Value);
|
|
|
|
MxSparseElement el3 = Assert.Single(result.SparseArrayValue.Elements, e => e.Index == 3u);
|
|
Assert.Same(element3, el3.Value);
|
|
}
|
|
|
|
/// <summary>Verifies that WriteArrayElementsAsync builds a write command whose value is a SparseArrayValue.</summary>
|
|
[Fact]
|
|
public async Task WriteArrayElementsAsync_BuildsWriteCommandWithSparseArrayValue()
|
|
{
|
|
FakeGatewayTransport transport = CreateTransport();
|
|
transport.AddInvokeReply(new MxCommandReply
|
|
{
|
|
SessionId = "session-fixture",
|
|
Kind = MxCommandKind.Write,
|
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
});
|
|
await using MxGatewayClient client = CreateClient(transport);
|
|
MxGatewaySession session = await client.OpenSessionAsync();
|
|
Dictionary<uint, MxValue> elements = new() { [1u] = 7.ToMxValue() };
|
|
|
|
await session.WriteArrayElementsAsync(
|
|
serverHandle: 12,
|
|
itemHandle: 34,
|
|
elementDataType: MxDataType.Integer,
|
|
totalLength: 5,
|
|
elements: elements,
|
|
userId: 56);
|
|
|
|
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
|
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
|
|
Assert.Equal(12, request.Command.Write.ServerHandle);
|
|
Assert.Equal(34, request.Command.Write.ItemHandle);
|
|
Assert.Equal(56, request.Command.Write.UserId);
|
|
MxValue written = request.Command.Write.Value;
|
|
Assert.Equal(MxValue.KindOneofCase.SparseArrayValue, written.KindCase);
|
|
Assert.Equal(5u, written.SparseArrayValue.TotalLength);
|
|
Assert.Equal(MxDataType.Integer, written.SparseArrayValue.ElementDataType);
|
|
MxSparseElement el = Assert.Single(written.SparseArrayValue.Elements);
|
|
Assert.Equal(1u, el.Index);
|
|
Assert.Equal(7, el.Value.Int32Value);
|
|
}
|
|
|
|
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
|
|
{
|
|
return new MxGatewayClient(transport.Options, transport);
|
|
}
|
|
|
|
private static FakeGatewayTransport CreateTransport()
|
|
{
|
|
return new FakeGatewayTransport(new MxGatewayClientOptions
|
|
{
|
|
Endpoint = new Uri("http://localhost:5000"),
|
|
ApiKey = "test-api-key",
|
|
});
|
|
}
|
|
|
|
private static RpcException CreateTransientRpcException()
|
|
{
|
|
return new RpcException(new Status(StatusCode.Unavailable, "gateway unavailable"));
|
|
}
|
|
}
|