From 68522504973665a317bde893323378c15c30d2d0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 19 Mar 2026 11:24:26 -0400 Subject: [PATCH] feat(infra): add ScadaServiceImpl with full proto parity for all RPCs --- .../lmxfakeproxy/Services/ScadaServiceImpl.cs | 255 ++++++++++++++++++ .../LmxFakeProxy.Tests.csproj | 1 + .../LmxFakeProxy.Tests/ScadaServiceTests.cs | 164 +++++++++++ 3 files changed, 420 insertions(+) create mode 100644 infra/lmxfakeproxy/Services/ScadaServiceImpl.cs create mode 100644 infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs diff --git a/infra/lmxfakeproxy/Services/ScadaServiceImpl.cs b/infra/lmxfakeproxy/Services/ScadaServiceImpl.cs new file mode 100644 index 0000000..aaa0bcf --- /dev/null +++ b/infra/lmxfakeproxy/Services/ScadaServiceImpl.cs @@ -0,0 +1,255 @@ +using Grpc.Core; +using LmxFakeProxy.Bridge; +using LmxFakeProxy.Grpc; +using LmxFakeProxy.Sessions; + +namespace LmxFakeProxy.Services; + +public class ScadaServiceImpl : ScadaService.ScadaServiceBase +{ + private readonly SessionManager _sessions; + private readonly IOpcUaBridge _bridge; + private readonly TagMapper _tagMapper; + + public ScadaServiceImpl(SessionManager sessions, IOpcUaBridge bridge, TagMapper tagMapper) + { + _sessions = sessions; + _bridge = bridge; + _tagMapper = tagMapper; + } + + public override Task Connect(ConnectRequest request, ServerCallContext context) + { + var (success, message, sessionId) = _sessions.Connect(request.ClientId, request.ApiKey); + return Task.FromResult(new ConnectResponse { Success = success, Message = message, SessionId = sessionId }); + } + + public override Task Disconnect(DisconnectRequest request, ServerCallContext context) + { + var ok = _sessions.Disconnect(request.SessionId); + return Task.FromResult(new DisconnectResponse + { + Success = ok, + Message = ok ? "Disconnected" : "Session not found" + }); + } + + public override Task GetConnectionState( + GetConnectionStateRequest request, ServerCallContext context) + { + var (found, clientId, ticks) = _sessions.GetConnectionState(request.SessionId); + return Task.FromResult(new GetConnectionStateResponse + { + IsConnected = found, ClientId = clientId, ConnectedSinceUtcTicks = ticks + }); + } + + public override Task CheckApiKey(CheckApiKeyRequest request, ServerCallContext context) + { + var valid = _sessions.CheckApiKey(request.ApiKey); + return Task.FromResult(new CheckApiKeyResponse + { + IsValid = valid, Message = valid ? "Valid" : "Invalid API key" + }); + } + + public override async Task Read(ReadRequest request, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + return new ReadResponse { Success = false, Message = "Invalid or expired session" }; + + try + { + var nodeId = _tagMapper.ToOpcNodeId(request.Tag); + var result = await _bridge.ReadAsync(nodeId, context.CancellationToken); + return new ReadResponse + { + Success = true, + Vtq = TagMapper.ToVtqMessage(request.Tag, result.Value, result.SourceTimestamp, result.StatusCode) + }; + } + catch (Exception ex) + { + return new ReadResponse { Success = false, Message = ex.Message }; + } + } + + public override async Task ReadBatch(ReadBatchRequest request, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + return new ReadBatchResponse { Success = false, Message = "Invalid or expired session" }; + + var response = new ReadBatchResponse { Success = true }; + foreach (var tag in request.Tags) + { + try + { + var nodeId = _tagMapper.ToOpcNodeId(tag); + var result = await _bridge.ReadAsync(nodeId, context.CancellationToken); + response.Vtqs.Add(TagMapper.ToVtqMessage(tag, result.Value, result.SourceTimestamp, result.StatusCode)); + } + catch (Exception ex) + { + response.Vtqs.Add(new VtqMessage + { + Tag = tag, Value = "", Quality = "Bad", TimestampUtcTicks = DateTime.UtcNow.Ticks + }); + response.Message = ex.Message; + } + } + return response; + } + + public override async Task Write(WriteRequest request, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + return new WriteResponse { Success = false, Message = "Invalid or expired session" }; + + try + { + var nodeId = _tagMapper.ToOpcNodeId(request.Tag); + var value = TagMapper.ParseWriteValue(request.Value); + var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken); + return statusCode == 0 + ? new WriteResponse { Success = true } + : new WriteResponse { Success = false, Message = $"OPC UA write failed: 0x{statusCode:X8}" }; + } + catch (Exception ex) + { + return new WriteResponse { Success = false, Message = ex.Message }; + } + } + + public override async Task WriteBatch(WriteBatchRequest request, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + return new WriteBatchResponse { Success = false, Message = "Invalid or expired session" }; + + var response = new WriteBatchResponse { Success = true }; + foreach (var item in request.Items) + { + try + { + var nodeId = _tagMapper.ToOpcNodeId(item.Tag); + var value = TagMapper.ParseWriteValue(item.Value); + var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken); + response.Results.Add(new Grpc.WriteResult + { + Tag = item.Tag, Success = statusCode == 0, + Message = statusCode == 0 ? "" : $"0x{statusCode:X8}" + }); + if (statusCode != 0) response.Success = false; + } + catch (Exception ex) + { + response.Results.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message }); + response.Success = false; + } + } + return response; + } + + public override async Task WriteBatchAndWait( + WriteBatchAndWaitRequest request, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + return new WriteBatchAndWaitResponse { Success = false, Message = "Invalid or expired session" }; + + var startTime = DateTime.UtcNow; + var writeResults = new List(); + var allWritesOk = true; + + foreach (var item in request.Items) + { + try + { + var nodeId = _tagMapper.ToOpcNodeId(item.Tag); + var value = TagMapper.ParseWriteValue(item.Value); + var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken); + writeResults.Add(new Grpc.WriteResult + { + Tag = item.Tag, Success = statusCode == 0, + Message = statusCode == 0 ? "" : $"0x{statusCode:X8}" + }); + if (statusCode != 0) allWritesOk = false; + } + catch (Exception ex) + { + writeResults.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message }); + allWritesOk = false; + } + } + + if (!allWritesOk) + { + var failResp = new WriteBatchAndWaitResponse { Success = false, Message = "Write failed" }; + failResp.WriteResults.AddRange(writeResults); + return failResp; + } + + var flagNodeId = _tagMapper.ToOpcNodeId(request.FlagTag); + var timeoutMs = request.TimeoutMs > 0 ? request.TimeoutMs : 5000; + var pollMs = request.PollIntervalMs > 0 ? request.PollIntervalMs : 100; + var deadline = startTime.AddMilliseconds(timeoutMs); + + while (DateTime.UtcNow < deadline) + { + context.CancellationToken.ThrowIfCancellationRequested(); + try + { + var readResult = await _bridge.ReadAsync(flagNodeId, context.CancellationToken); + if (readResult.Value?.ToString() == request.FlagValue) + { + var elapsed = (int)(DateTime.UtcNow - startTime).TotalMilliseconds; + var resp = new WriteBatchAndWaitResponse { Success = true, FlagReached = true, ElapsedMs = elapsed }; + resp.WriteResults.AddRange(writeResults); + return resp; + } + } + catch { } + await Task.Delay(pollMs, context.CancellationToken); + } + + var finalResp = new WriteBatchAndWaitResponse + { + Success = true, FlagReached = false, + ElapsedMs = (int)(DateTime.UtcNow - startTime).TotalMilliseconds, + Message = "Timeout waiting for flag value" + }; + finalResp.WriteResults.AddRange(writeResults); + return finalResp; + } + + public override async Task Subscribe( + SubscribeRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + if (!_sessions.ValidateSession(request.SessionId)) + throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid or expired session")); + + var nodeIds = request.Tags.Select(t => _tagMapper.ToOpcNodeId(t)).ToList(); + var tagByNodeId = request.Tags.Zip(nodeIds).ToDictionary(p => p.Second, p => p.First); + + var handle = await _bridge.AddMonitoredItemsAsync( + nodeIds, request.SamplingMs, + (nodeId, value, timestamp, statusCode) => + { + if (tagByNodeId.TryGetValue(nodeId, out var tag)) + { + var vtq = TagMapper.ToVtqMessage(tag, value, timestamp, statusCode); + try { responseStream.WriteAsync(vtq).Wait(); } + catch { } + } + }, + context.CancellationToken); + + try + { + await Task.Delay(Timeout.Infinite, context.CancellationToken); + } + catch (OperationCanceledException) { } + finally + { + await _bridge.RemoveMonitoredItemsAsync(handle); + } + } +} diff --git a/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/LmxFakeProxy.Tests.csproj b/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/LmxFakeProxy.Tests.csproj index 580a665..a17b68a 100644 --- a/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/LmxFakeProxy.Tests.csproj +++ b/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/LmxFakeProxy.Tests.csproj @@ -12,6 +12,7 @@ + diff --git a/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs b/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs new file mode 100644 index 0000000..df5d86c --- /dev/null +++ b/infra/lmxfakeproxy/tests/LmxFakeProxy.Tests/ScadaServiceTests.cs @@ -0,0 +1,164 @@ +using Grpc.Core; +using NSubstitute; +using LmxFakeProxy.Bridge; +using LmxFakeProxy.Grpc; +using LmxFakeProxy.Sessions; +using LmxFakeProxy.Services; + +namespace LmxFakeProxy.Tests; + +public class ScadaServiceTests +{ + private readonly IOpcUaBridge _mockBridge; + private readonly SessionManager _sessionMgr; + private readonly TagMapper _tagMapper; + private readonly ScadaServiceImpl _service; + + public ScadaServiceTests() + { + _mockBridge = Substitute.For(); + _mockBridge.IsConnected.Returns(true); + _sessionMgr = new SessionManager(null); + _tagMapper = new TagMapper("ns=3;s="); + _service = new ScadaServiceImpl(_sessionMgr, _mockBridge, _tagMapper); + } + + private string ConnectClient(string clientId = "test-client") + { + var (_, _, sessionId) = _sessionMgr.Connect(clientId, ""); + return sessionId; + } + + private static ServerCallContext MockContext() + { + return new TestServerCallContext(); + } + + [Fact] + public async Task Connect_ReturnsSessionId() + { + var resp = await _service.Connect( + new ConnectRequest { ClientId = "c1", ApiKey = "" }, MockContext()); + Assert.True(resp.Success); + Assert.NotEmpty(resp.SessionId); + } + + [Fact] + public async Task Read_ValidSession_ReturnsVtq() + { + var sid = ConnectClient(); + _mockBridge.ReadAsync("ns=3;s=Motor.Speed", Arg.Any()) + .Returns(new OpcUaReadResult(42.5, DateTime.UtcNow, 0)); + + var resp = await _service.Read( + new ReadRequest { SessionId = sid, Tag = "Motor.Speed" }, MockContext()); + + Assert.True(resp.Success); + Assert.Equal("42.5", resp.Vtq.Value); + Assert.Equal("Good", resp.Vtq.Quality); + } + + [Fact] + public async Task Read_InvalidSession_ReturnsFailure() + { + var resp = await _service.Read( + new ReadRequest { SessionId = "bogus", Tag = "Motor.Speed" }, MockContext()); + Assert.False(resp.Success); + Assert.Contains("Invalid", resp.Message); + } + + [Fact] + public async Task ReadBatch_ReturnsAllTags() + { + var sid = ConnectClient(); + _mockBridge.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new OpcUaReadResult(1.0, DateTime.UtcNow, 0)); + + var req = new ReadBatchRequest { SessionId = sid }; + req.Tags.AddRange(new[] { "Motor.Speed", "Pump.FlowRate" }); + + var resp = await _service.ReadBatch(req, MockContext()); + + Assert.True(resp.Success); + Assert.Equal(2, resp.Vtqs.Count); + } + + [Fact] + public async Task Write_ValidSession_Succeeds() + { + var sid = ConnectClient(); + _mockBridge.WriteAsync("ns=3;s=Motor.Speed", Arg.Any(), Arg.Any()) + .Returns(0u); + + var resp = await _service.Write( + new WriteRequest { SessionId = sid, Tag = "Motor.Speed", Value = "42.5" }, MockContext()); + + Assert.True(resp.Success); + } + + [Fact] + public async Task Write_InvalidSession_ReturnsFailure() + { + var resp = await _service.Write( + new WriteRequest { SessionId = "bogus", Tag = "Motor.Speed", Value = "42.5" }, MockContext()); + Assert.False(resp.Success); + } + + [Fact] + public async Task WriteBatch_ReturnsPerItemResults() + { + var sid = ConnectClient(); + _mockBridge.WriteAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(0u); + + var req = new WriteBatchRequest { SessionId = sid }; + req.Items.Add(new WriteItem { Tag = "Motor.Speed", Value = "42.5" }); + req.Items.Add(new WriteItem { Tag = "Pump.FlowRate", Value = "10.0" }); + + var resp = await _service.WriteBatch(req, MockContext()); + + Assert.True(resp.Success); + Assert.Equal(2, resp.Results.Count); + Assert.All(resp.Results, r => Assert.True(r.Success)); + } + + [Fact] + public async Task CheckApiKey_Valid_ReturnsTrue() + { + var resp = await _service.CheckApiKey( + new CheckApiKeyRequest { ApiKey = "anything" }, MockContext()); + Assert.True(resp.IsValid); + } + + [Fact] + public async Task CheckApiKey_Invalid_ReturnsFalse() + { + var mgr = new SessionManager("secret"); + var svc = new ScadaServiceImpl(mgr, _mockBridge, _tagMapper); + + var resp = await svc.CheckApiKey( + new CheckApiKeyRequest { ApiKey = "wrong" }, MockContext()); + Assert.False(resp.IsValid); + } +} + +/// +/// Minimal ServerCallContext for unit testing gRPC services. +/// +internal class TestServerCallContext : ServerCallContext +{ + protected override string MethodCore => "test"; + protected override string HostCore => "localhost"; + protected override string PeerCore => "test-peer"; + protected override DateTime DeadlineCore => DateTime.MaxValue; + protected override Metadata RequestHeadersCore => new(); + protected override CancellationToken CancellationTokenCore => CancellationToken.None; + protected override Metadata ResponseTrailersCore => new(); + protected override Status StatusCore { get; set; } + protected override WriteOptions? WriteOptionsCore { get; set; } + protected override AuthContext AuthContextCore => new("test", new Dictionary>()); + + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options) => + throw new NotImplementedException(); + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) => Task.CompletedTask; +}