feat(infra): add ScadaServiceImpl with full proto parity for all RPCs
This commit is contained in:
255
infra/lmxfakeproxy/Services/ScadaServiceImpl.cs
Normal file
255
infra/lmxfakeproxy/Services/ScadaServiceImpl.cs
Normal file
@@ -0,0 +1,255 @@
|
|||||||
|
using Grpc.Core;
|
||||||
|
using LmxFakeProxy.Bridge;
|
||||||
|
using LmxFakeProxy.Grpc;
|
||||||
|
using LmxFakeProxy.Sessions;
|
||||||
|
|
||||||
|
namespace LmxFakeProxy.Services;
|
||||||
|
|
||||||
|
public class ScadaServiceImpl : ScadaService.ScadaServiceBase
|
||||||
|
{
|
||||||
|
private readonly SessionManager _sessions;
|
||||||
|
private readonly IOpcUaBridge _bridge;
|
||||||
|
private readonly TagMapper _tagMapper;
|
||||||
|
|
||||||
|
public ScadaServiceImpl(SessionManager sessions, IOpcUaBridge bridge, TagMapper tagMapper)
|
||||||
|
{
|
||||||
|
_sessions = sessions;
|
||||||
|
_bridge = bridge;
|
||||||
|
_tagMapper = tagMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task<ConnectResponse> Connect(ConnectRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
var (success, message, sessionId) = _sessions.Connect(request.ClientId, request.ApiKey);
|
||||||
|
return Task.FromResult(new ConnectResponse { Success = success, Message = message, SessionId = sessionId });
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task<DisconnectResponse> Disconnect(DisconnectRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
var ok = _sessions.Disconnect(request.SessionId);
|
||||||
|
return Task.FromResult(new DisconnectResponse
|
||||||
|
{
|
||||||
|
Success = ok,
|
||||||
|
Message = ok ? "Disconnected" : "Session not found"
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task<GetConnectionStateResponse> GetConnectionState(
|
||||||
|
GetConnectionStateRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
var (found, clientId, ticks) = _sessions.GetConnectionState(request.SessionId);
|
||||||
|
return Task.FromResult(new GetConnectionStateResponse
|
||||||
|
{
|
||||||
|
IsConnected = found, ClientId = clientId, ConnectedSinceUtcTicks = ticks
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task<CheckApiKeyResponse> CheckApiKey(CheckApiKeyRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
var valid = _sessions.CheckApiKey(request.ApiKey);
|
||||||
|
return Task.FromResult(new CheckApiKeyResponse
|
||||||
|
{
|
||||||
|
IsValid = valid, Message = valid ? "Valid" : "Invalid API key"
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task<ReadResponse> Read(ReadRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
return new ReadResponse { Success = false, Message = "Invalid or expired session" };
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var nodeId = _tagMapper.ToOpcNodeId(request.Tag);
|
||||||
|
var result = await _bridge.ReadAsync(nodeId, context.CancellationToken);
|
||||||
|
return new ReadResponse
|
||||||
|
{
|
||||||
|
Success = true,
|
||||||
|
Vtq = TagMapper.ToVtqMessage(request.Tag, result.Value, result.SourceTimestamp, result.StatusCode)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new ReadResponse { Success = false, Message = ex.Message };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task<ReadBatchResponse> ReadBatch(ReadBatchRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
return new ReadBatchResponse { Success = false, Message = "Invalid or expired session" };
|
||||||
|
|
||||||
|
var response = new ReadBatchResponse { Success = true };
|
||||||
|
foreach (var tag in request.Tags)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var nodeId = _tagMapper.ToOpcNodeId(tag);
|
||||||
|
var result = await _bridge.ReadAsync(nodeId, context.CancellationToken);
|
||||||
|
response.Vtqs.Add(TagMapper.ToVtqMessage(tag, result.Value, result.SourceTimestamp, result.StatusCode));
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
response.Vtqs.Add(new VtqMessage
|
||||||
|
{
|
||||||
|
Tag = tag, Value = "", Quality = "Bad", TimestampUtcTicks = DateTime.UtcNow.Ticks
|
||||||
|
});
|
||||||
|
response.Message = ex.Message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task<WriteResponse> Write(WriteRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
return new WriteResponse { Success = false, Message = "Invalid or expired session" };
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var nodeId = _tagMapper.ToOpcNodeId(request.Tag);
|
||||||
|
var value = TagMapper.ParseWriteValue(request.Value);
|
||||||
|
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
|
||||||
|
return statusCode == 0
|
||||||
|
? new WriteResponse { Success = true }
|
||||||
|
: new WriteResponse { Success = false, Message = $"OPC UA write failed: 0x{statusCode:X8}" };
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new WriteResponse { Success = false, Message = ex.Message };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task<WriteBatchResponse> WriteBatch(WriteBatchRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
return new WriteBatchResponse { Success = false, Message = "Invalid or expired session" };
|
||||||
|
|
||||||
|
var response = new WriteBatchResponse { Success = true };
|
||||||
|
foreach (var item in request.Items)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var nodeId = _tagMapper.ToOpcNodeId(item.Tag);
|
||||||
|
var value = TagMapper.ParseWriteValue(item.Value);
|
||||||
|
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
|
||||||
|
response.Results.Add(new Grpc.WriteResult
|
||||||
|
{
|
||||||
|
Tag = item.Tag, Success = statusCode == 0,
|
||||||
|
Message = statusCode == 0 ? "" : $"0x{statusCode:X8}"
|
||||||
|
});
|
||||||
|
if (statusCode != 0) response.Success = false;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
response.Results.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message });
|
||||||
|
response.Success = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task<WriteBatchAndWaitResponse> WriteBatchAndWait(
|
||||||
|
WriteBatchAndWaitRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
return new WriteBatchAndWaitResponse { Success = false, Message = "Invalid or expired session" };
|
||||||
|
|
||||||
|
var startTime = DateTime.UtcNow;
|
||||||
|
var writeResults = new List<Grpc.WriteResult>();
|
||||||
|
var allWritesOk = true;
|
||||||
|
|
||||||
|
foreach (var item in request.Items)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var nodeId = _tagMapper.ToOpcNodeId(item.Tag);
|
||||||
|
var value = TagMapper.ParseWriteValue(item.Value);
|
||||||
|
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
|
||||||
|
writeResults.Add(new Grpc.WriteResult
|
||||||
|
{
|
||||||
|
Tag = item.Tag, Success = statusCode == 0,
|
||||||
|
Message = statusCode == 0 ? "" : $"0x{statusCode:X8}"
|
||||||
|
});
|
||||||
|
if (statusCode != 0) allWritesOk = false;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
writeResults.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message });
|
||||||
|
allWritesOk = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!allWritesOk)
|
||||||
|
{
|
||||||
|
var failResp = new WriteBatchAndWaitResponse { Success = false, Message = "Write failed" };
|
||||||
|
failResp.WriteResults.AddRange(writeResults);
|
||||||
|
return failResp;
|
||||||
|
}
|
||||||
|
|
||||||
|
var flagNodeId = _tagMapper.ToOpcNodeId(request.FlagTag);
|
||||||
|
var timeoutMs = request.TimeoutMs > 0 ? request.TimeoutMs : 5000;
|
||||||
|
var pollMs = request.PollIntervalMs > 0 ? request.PollIntervalMs : 100;
|
||||||
|
var deadline = startTime.AddMilliseconds(timeoutMs);
|
||||||
|
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
context.CancellationToken.ThrowIfCancellationRequested();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var readResult = await _bridge.ReadAsync(flagNodeId, context.CancellationToken);
|
||||||
|
if (readResult.Value?.ToString() == request.FlagValue)
|
||||||
|
{
|
||||||
|
var elapsed = (int)(DateTime.UtcNow - startTime).TotalMilliseconds;
|
||||||
|
var resp = new WriteBatchAndWaitResponse { Success = true, FlagReached = true, ElapsedMs = elapsed };
|
||||||
|
resp.WriteResults.AddRange(writeResults);
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch { }
|
||||||
|
await Task.Delay(pollMs, context.CancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
var finalResp = new WriteBatchAndWaitResponse
|
||||||
|
{
|
||||||
|
Success = true, FlagReached = false,
|
||||||
|
ElapsedMs = (int)(DateTime.UtcNow - startTime).TotalMilliseconds,
|
||||||
|
Message = "Timeout waiting for flag value"
|
||||||
|
};
|
||||||
|
finalResp.WriteResults.AddRange(writeResults);
|
||||||
|
return finalResp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task Subscribe(
|
||||||
|
SubscribeRequest request, IServerStreamWriter<VtqMessage> responseStream, ServerCallContext context)
|
||||||
|
{
|
||||||
|
if (!_sessions.ValidateSession(request.SessionId))
|
||||||
|
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid or expired session"));
|
||||||
|
|
||||||
|
var nodeIds = request.Tags.Select(t => _tagMapper.ToOpcNodeId(t)).ToList();
|
||||||
|
var tagByNodeId = request.Tags.Zip(nodeIds).ToDictionary(p => p.Second, p => p.First);
|
||||||
|
|
||||||
|
var handle = await _bridge.AddMonitoredItemsAsync(
|
||||||
|
nodeIds, request.SamplingMs,
|
||||||
|
(nodeId, value, timestamp, statusCode) =>
|
||||||
|
{
|
||||||
|
if (tagByNodeId.TryGetValue(nodeId, out var tag))
|
||||||
|
{
|
||||||
|
var vtq = TagMapper.ToVtqMessage(tag, value, timestamp, statusCode);
|
||||||
|
try { responseStream.WriteAsync(vtq).Wait(); }
|
||||||
|
catch { }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
context.CancellationToken);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(Timeout.Infinite, context.CancellationToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { }
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await _bridge.RemoveMonitoredItemsAsync(handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@
|
|||||||
<PackageReference Include="xunit" Version="2.9.3" />
|
<PackageReference Include="xunit" Version="2.9.3" />
|
||||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.0" />
|
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.0" />
|
||||||
<PackageReference Include="NSubstitute" Version="5.3.0" />
|
<PackageReference Include="NSubstitute" Version="5.3.0" />
|
||||||
|
<PackageReference Include="Grpc.Core" Version="2.46.6" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
164
infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs
Normal file
164
infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
using Grpc.Core;
|
||||||
|
using NSubstitute;
|
||||||
|
using LmxFakeProxy.Bridge;
|
||||||
|
using LmxFakeProxy.Grpc;
|
||||||
|
using LmxFakeProxy.Sessions;
|
||||||
|
using LmxFakeProxy.Services;
|
||||||
|
|
||||||
|
namespace LmxFakeProxy.Tests;
|
||||||
|
|
||||||
|
public class ScadaServiceTests
|
||||||
|
{
|
||||||
|
private readonly IOpcUaBridge _mockBridge;
|
||||||
|
private readonly SessionManager _sessionMgr;
|
||||||
|
private readonly TagMapper _tagMapper;
|
||||||
|
private readonly ScadaServiceImpl _service;
|
||||||
|
|
||||||
|
public ScadaServiceTests()
|
||||||
|
{
|
||||||
|
_mockBridge = Substitute.For<IOpcUaBridge>();
|
||||||
|
_mockBridge.IsConnected.Returns(true);
|
||||||
|
_sessionMgr = new SessionManager(null);
|
||||||
|
_tagMapper = new TagMapper("ns=3;s=");
|
||||||
|
_service = new ScadaServiceImpl(_sessionMgr, _mockBridge, _tagMapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
private string ConnectClient(string clientId = "test-client")
|
||||||
|
{
|
||||||
|
var (_, _, sessionId) = _sessionMgr.Connect(clientId, "");
|
||||||
|
return sessionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ServerCallContext MockContext()
|
||||||
|
{
|
||||||
|
return new TestServerCallContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Connect_ReturnsSessionId()
|
||||||
|
{
|
||||||
|
var resp = await _service.Connect(
|
||||||
|
new ConnectRequest { ClientId = "c1", ApiKey = "" }, MockContext());
|
||||||
|
Assert.True(resp.Success);
|
||||||
|
Assert.NotEmpty(resp.SessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Read_ValidSession_ReturnsVtq()
|
||||||
|
{
|
||||||
|
var sid = ConnectClient();
|
||||||
|
_mockBridge.ReadAsync("ns=3;s=Motor.Speed", Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new OpcUaReadResult(42.5, DateTime.UtcNow, 0));
|
||||||
|
|
||||||
|
var resp = await _service.Read(
|
||||||
|
new ReadRequest { SessionId = sid, Tag = "Motor.Speed" }, MockContext());
|
||||||
|
|
||||||
|
Assert.True(resp.Success);
|
||||||
|
Assert.Equal("42.5", resp.Vtq.Value);
|
||||||
|
Assert.Equal("Good", resp.Vtq.Quality);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Read_InvalidSession_ReturnsFailure()
|
||||||
|
{
|
||||||
|
var resp = await _service.Read(
|
||||||
|
new ReadRequest { SessionId = "bogus", Tag = "Motor.Speed" }, MockContext());
|
||||||
|
Assert.False(resp.Success);
|
||||||
|
Assert.Contains("Invalid", resp.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadBatch_ReturnsAllTags()
|
||||||
|
{
|
||||||
|
var sid = ConnectClient();
|
||||||
|
_mockBridge.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new OpcUaReadResult(1.0, DateTime.UtcNow, 0));
|
||||||
|
|
||||||
|
var req = new ReadBatchRequest { SessionId = sid };
|
||||||
|
req.Tags.AddRange(new[] { "Motor.Speed", "Pump.FlowRate" });
|
||||||
|
|
||||||
|
var resp = await _service.ReadBatch(req, MockContext());
|
||||||
|
|
||||||
|
Assert.True(resp.Success);
|
||||||
|
Assert.Equal(2, resp.Vtqs.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_ValidSession_Succeeds()
|
||||||
|
{
|
||||||
|
var sid = ConnectClient();
|
||||||
|
_mockBridge.WriteAsync("ns=3;s=Motor.Speed", Arg.Any<object?>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(0u);
|
||||||
|
|
||||||
|
var resp = await _service.Write(
|
||||||
|
new WriteRequest { SessionId = sid, Tag = "Motor.Speed", Value = "42.5" }, MockContext());
|
||||||
|
|
||||||
|
Assert.True(resp.Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_InvalidSession_ReturnsFailure()
|
||||||
|
{
|
||||||
|
var resp = await _service.Write(
|
||||||
|
new WriteRequest { SessionId = "bogus", Tag = "Motor.Speed", Value = "42.5" }, MockContext());
|
||||||
|
Assert.False(resp.Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteBatch_ReturnsPerItemResults()
|
||||||
|
{
|
||||||
|
var sid = ConnectClient();
|
||||||
|
_mockBridge.WriteAsync(Arg.Any<string>(), Arg.Any<object?>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(0u);
|
||||||
|
|
||||||
|
var req = new WriteBatchRequest { SessionId = sid };
|
||||||
|
req.Items.Add(new WriteItem { Tag = "Motor.Speed", Value = "42.5" });
|
||||||
|
req.Items.Add(new WriteItem { Tag = "Pump.FlowRate", Value = "10.0" });
|
||||||
|
|
||||||
|
var resp = await _service.WriteBatch(req, MockContext());
|
||||||
|
|
||||||
|
Assert.True(resp.Success);
|
||||||
|
Assert.Equal(2, resp.Results.Count);
|
||||||
|
Assert.All(resp.Results, r => Assert.True(r.Success));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CheckApiKey_Valid_ReturnsTrue()
|
||||||
|
{
|
||||||
|
var resp = await _service.CheckApiKey(
|
||||||
|
new CheckApiKeyRequest { ApiKey = "anything" }, MockContext());
|
||||||
|
Assert.True(resp.IsValid);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CheckApiKey_Invalid_ReturnsFalse()
|
||||||
|
{
|
||||||
|
var mgr = new SessionManager("secret");
|
||||||
|
var svc = new ScadaServiceImpl(mgr, _mockBridge, _tagMapper);
|
||||||
|
|
||||||
|
var resp = await svc.CheckApiKey(
|
||||||
|
new CheckApiKeyRequest { ApiKey = "wrong" }, MockContext());
|
||||||
|
Assert.False(resp.IsValid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Minimal ServerCallContext for unit testing gRPC services.
|
||||||
|
/// </summary>
|
||||||
|
internal class TestServerCallContext : ServerCallContext
|
||||||
|
{
|
||||||
|
protected override string MethodCore => "test";
|
||||||
|
protected override string HostCore => "localhost";
|
||||||
|
protected override string PeerCore => "test-peer";
|
||||||
|
protected override DateTime DeadlineCore => DateTime.MaxValue;
|
||||||
|
protected override Metadata RequestHeadersCore => new();
|
||||||
|
protected override CancellationToken CancellationTokenCore => CancellationToken.None;
|
||||||
|
protected override Metadata ResponseTrailersCore => new();
|
||||||
|
protected override Status StatusCore { get; set; }
|
||||||
|
protected override WriteOptions? WriteOptionsCore { get; set; }
|
||||||
|
protected override AuthContext AuthContextCore => new("test", new Dictionary<string, List<AuthProperty>>());
|
||||||
|
|
||||||
|
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options) =>
|
||||||
|
throw new NotImplementedException();
|
||||||
|
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) => Task.CompletedTask;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user