Files
scadalink-design/infra/lmxfakeproxy/Services/ScadaServiceImpl.cs

256 lines
9.9 KiB
C#

using Grpc.Core;
using LmxFakeProxy.Bridge;
using LmxFakeProxy.Grpc;
using LmxFakeProxy.Sessions;
namespace LmxFakeProxy.Services;
public class ScadaServiceImpl : ScadaService.ScadaServiceBase
{
private readonly SessionManager _sessions;
private readonly IOpcUaBridge _bridge;
private readonly TagMapper _tagMapper;
public ScadaServiceImpl(SessionManager sessions, IOpcUaBridge bridge, TagMapper tagMapper)
{
_sessions = sessions;
_bridge = bridge;
_tagMapper = tagMapper;
}
public override Task<ConnectResponse> Connect(ConnectRequest request, ServerCallContext context)
{
var (success, message, sessionId) = _sessions.Connect(request.ClientId, request.ApiKey);
return Task.FromResult(new ConnectResponse { Success = success, Message = message, SessionId = sessionId });
}
public override Task<DisconnectResponse> Disconnect(DisconnectRequest request, ServerCallContext context)
{
var ok = _sessions.Disconnect(request.SessionId);
return Task.FromResult(new DisconnectResponse
{
Success = ok,
Message = ok ? "Disconnected" : "Session not found"
});
}
public override Task<GetConnectionStateResponse> GetConnectionState(
GetConnectionStateRequest request, ServerCallContext context)
{
var (found, clientId, ticks) = _sessions.GetConnectionState(request.SessionId);
return Task.FromResult(new GetConnectionStateResponse
{
IsConnected = found, ClientId = clientId, ConnectedSinceUtcTicks = ticks
});
}
public override Task<CheckApiKeyResponse> CheckApiKey(CheckApiKeyRequest request, ServerCallContext context)
{
var valid = _sessions.CheckApiKey(request.ApiKey);
return Task.FromResult(new CheckApiKeyResponse
{
IsValid = valid, Message = valid ? "Valid" : "Invalid API key"
});
}
public override async Task<ReadResponse> Read(ReadRequest request, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
return new ReadResponse { Success = false, Message = "Invalid or expired session" };
try
{
var nodeId = _tagMapper.ToOpcNodeId(request.Tag);
var result = await _bridge.ReadAsync(nodeId, context.CancellationToken);
return new ReadResponse
{
Success = true,
Vtq = TagMapper.ToVtqMessage(request.Tag, result.Value, result.SourceTimestamp, result.StatusCode)
};
}
catch (Exception ex)
{
return new ReadResponse { Success = false, Message = ex.Message };
}
}
public override async Task<ReadBatchResponse> ReadBatch(ReadBatchRequest request, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
return new ReadBatchResponse { Success = false, Message = "Invalid or expired session" };
var response = new ReadBatchResponse { Success = true };
foreach (var tag in request.Tags)
{
try
{
var nodeId = _tagMapper.ToOpcNodeId(tag);
var result = await _bridge.ReadAsync(nodeId, context.CancellationToken);
response.Vtqs.Add(TagMapper.ToVtqMessage(tag, result.Value, result.SourceTimestamp, result.StatusCode));
}
catch (Exception ex)
{
response.Vtqs.Add(new VtqMessage
{
Tag = tag, Value = "", Quality = "Bad", TimestampUtcTicks = DateTime.UtcNow.Ticks
});
response.Message = ex.Message;
}
}
return response;
}
public override async Task<WriteResponse> Write(WriteRequest request, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
return new WriteResponse { Success = false, Message = "Invalid or expired session" };
try
{
var nodeId = _tagMapper.ToOpcNodeId(request.Tag);
var value = TagMapper.ParseWriteValue(request.Value);
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
return statusCode == 0
? new WriteResponse { Success = true }
: new WriteResponse { Success = false, Message = $"OPC UA write failed: 0x{statusCode:X8}" };
}
catch (Exception ex)
{
return new WriteResponse { Success = false, Message = ex.Message };
}
}
public override async Task<WriteBatchResponse> WriteBatch(WriteBatchRequest request, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
return new WriteBatchResponse { Success = false, Message = "Invalid or expired session" };
var response = new WriteBatchResponse { Success = true };
foreach (var item in request.Items)
{
try
{
var nodeId = _tagMapper.ToOpcNodeId(item.Tag);
var value = TagMapper.ParseWriteValue(item.Value);
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
response.Results.Add(new Grpc.WriteResult
{
Tag = item.Tag, Success = statusCode == 0,
Message = statusCode == 0 ? "" : $"0x{statusCode:X8}"
});
if (statusCode != 0) response.Success = false;
}
catch (Exception ex)
{
response.Results.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message });
response.Success = false;
}
}
return response;
}
public override async Task<WriteBatchAndWaitResponse> WriteBatchAndWait(
WriteBatchAndWaitRequest request, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
return new WriteBatchAndWaitResponse { Success = false, Message = "Invalid or expired session" };
var startTime = DateTime.UtcNow;
var writeResults = new List<Grpc.WriteResult>();
var allWritesOk = true;
foreach (var item in request.Items)
{
try
{
var nodeId = _tagMapper.ToOpcNodeId(item.Tag);
var value = TagMapper.ParseWriteValue(item.Value);
var statusCode = await _bridge.WriteAsync(nodeId, value, context.CancellationToken);
writeResults.Add(new Grpc.WriteResult
{
Tag = item.Tag, Success = statusCode == 0,
Message = statusCode == 0 ? "" : $"0x{statusCode:X8}"
});
if (statusCode != 0) allWritesOk = false;
}
catch (Exception ex)
{
writeResults.Add(new Grpc.WriteResult { Tag = item.Tag, Success = false, Message = ex.Message });
allWritesOk = false;
}
}
if (!allWritesOk)
{
var failResp = new WriteBatchAndWaitResponse { Success = false, Message = "Write failed" };
failResp.WriteResults.AddRange(writeResults);
return failResp;
}
var flagNodeId = _tagMapper.ToOpcNodeId(request.FlagTag);
var timeoutMs = request.TimeoutMs > 0 ? request.TimeoutMs : 5000;
var pollMs = request.PollIntervalMs > 0 ? request.PollIntervalMs : 100;
var deadline = startTime.AddMilliseconds(timeoutMs);
while (DateTime.UtcNow < deadline)
{
context.CancellationToken.ThrowIfCancellationRequested();
try
{
var readResult = await _bridge.ReadAsync(flagNodeId, context.CancellationToken);
if (readResult.Value?.ToString() == request.FlagValue)
{
var elapsed = (int)(DateTime.UtcNow - startTime).TotalMilliseconds;
var resp = new WriteBatchAndWaitResponse { Success = true, FlagReached = true, ElapsedMs = elapsed };
resp.WriteResults.AddRange(writeResults);
return resp;
}
}
catch { }
await Task.Delay(pollMs, context.CancellationToken);
}
var finalResp = new WriteBatchAndWaitResponse
{
Success = true, FlagReached = false,
ElapsedMs = (int)(DateTime.UtcNow - startTime).TotalMilliseconds,
Message = "Timeout waiting for flag value"
};
finalResp.WriteResults.AddRange(writeResults);
return finalResp;
}
public override async Task Subscribe(
SubscribeRequest request, IServerStreamWriter<VtqMessage> responseStream, ServerCallContext context)
{
if (!_sessions.ValidateSession(request.SessionId))
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid or expired session"));
var nodeIds = request.Tags.Select(t => _tagMapper.ToOpcNodeId(t)).ToList();
var tagByNodeId = request.Tags.Zip(nodeIds).ToDictionary(p => p.Second, p => p.First);
var handle = await _bridge.AddMonitoredItemsAsync(
nodeIds, request.SamplingMs,
(nodeId, value, timestamp, statusCode) =>
{
if (tagByNodeId.TryGetValue(nodeId, out var tag))
{
var vtq = TagMapper.ToVtqMessage(tag, value, timestamp, statusCode);
try { responseStream.WriteAsync(vtq).Wait(); }
catch { }
}
},
context.CancellationToken);
try
{
await Task.Delay(Timeout.Infinite, context.CancellationToken);
}
catch (OperationCanceledException) { }
finally
{
await _bridge.RemoveMonitoredItemsAsync(handle);
}
}
}