feat(lmxproxy): phase 3 — host gRPC server, security, configuration, service hosting
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,412 @@
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Grpc.Core;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.LmxProxy.Host.Domain;
|
||||
using ZB.MOM.WW.LmxProxy.Host.Sessions;
|
||||
using ZB.MOM.WW.LmxProxy.Host.Subscriptions;
|
||||
|
||||
namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
|
||||
{
|
||||
/// <summary>
|
||||
/// gRPC service implementation for all 10 SCADA RPCs.
|
||||
/// Inherits from proto-generated ScadaService.ScadaServiceBase.
|
||||
/// </summary>
|
||||
public class ScadaGrpcService : Scada.ScadaService.ScadaServiceBase
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<ScadaGrpcService>();
|
||||
|
||||
private readonly IScadaClient _scadaClient;
|
||||
private readonly SessionManager _sessionManager;
|
||||
private readonly SubscriptionManager _subscriptionManager;
|
||||
|
||||
public ScadaGrpcService(
|
||||
IScadaClient scadaClient,
|
||||
SessionManager sessionManager,
|
||||
SubscriptionManager subscriptionManager)
|
||||
{
|
||||
_scadaClient = scadaClient;
|
||||
_sessionManager = sessionManager;
|
||||
_subscriptionManager = subscriptionManager;
|
||||
}
|
||||
|
||||
// -- Connection Management ------------------------------------
|
||||
|
||||
public override Task<Scada.ConnectResponse> Connect(
|
||||
Scada.ConnectRequest request, ServerCallContext context)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!_scadaClient.IsConnected)
|
||||
{
|
||||
return Task.FromResult(new Scada.ConnectResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = "MxAccess is not connected"
|
||||
});
|
||||
}
|
||||
|
||||
var sessionId = _sessionManager.CreateSession(request.ClientId, request.ApiKey);
|
||||
|
||||
return Task.FromResult(new Scada.ConnectResponse
|
||||
{
|
||||
Success = true,
|
||||
Message = "Connected",
|
||||
SessionId = sessionId
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Connect failed for client {ClientId}", request.ClientId);
|
||||
return Task.FromResult(new Scada.ConnectResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = ex.Message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public override Task<Scada.DisconnectResponse> Disconnect(
|
||||
Scada.DisconnectRequest request, ServerCallContext context)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Clean up subscriptions for this session
|
||||
_subscriptionManager.UnsubscribeClient(request.SessionId);
|
||||
|
||||
var terminated = _sessionManager.TerminateSession(request.SessionId);
|
||||
return Task.FromResult(new Scada.DisconnectResponse
|
||||
{
|
||||
Success = terminated,
|
||||
Message = terminated ? "Disconnected" : "Session not found"
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Disconnect failed for session {SessionId}", request.SessionId);
|
||||
return Task.FromResult(new Scada.DisconnectResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = ex.Message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public override Task<Scada.GetConnectionStateResponse> GetConnectionState(
|
||||
Scada.GetConnectionStateRequest request, ServerCallContext context)
|
||||
{
|
||||
var session = _sessionManager.GetSession(request.SessionId);
|
||||
return Task.FromResult(new Scada.GetConnectionStateResponse
|
||||
{
|
||||
IsConnected = _scadaClient.IsConnected,
|
||||
ClientId = session?.ClientId ?? "",
|
||||
ConnectedSinceUtcTicks = session?.ConnectedSinceUtcTicks ?? 0
|
||||
});
|
||||
}
|
||||
|
||||
// -- Read Operations ------------------------------------------
|
||||
|
||||
public override async Task<Scada.ReadResponse> Read(
|
||||
Scada.ReadRequest request, ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
return new Scada.ReadResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = "Invalid session",
|
||||
Vtq = CreateBadVtq(request.Tag, QualityCodeMapper.Bad())
|
||||
};
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var vtq = await _scadaClient.ReadAsync(request.Tag, context.CancellationToken);
|
||||
return new Scada.ReadResponse
|
||||
{
|
||||
Success = true,
|
||||
Message = "",
|
||||
Vtq = ConvertToProtoVtq(request.Tag, vtq)
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Read failed for tag {Tag}", request.Tag);
|
||||
return new Scada.ReadResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = ex.Message,
|
||||
Vtq = CreateBadVtq(request.Tag, QualityCodeMapper.BadCommunicationFailure())
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task<Scada.ReadBatchResponse> ReadBatch(
|
||||
Scada.ReadBatchRequest request, ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
return new Scada.ReadBatchResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = "Invalid session"
|
||||
};
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var results = await _scadaClient.ReadBatchAsync(request.Tags, context.CancellationToken);
|
||||
|
||||
var response = new Scada.ReadBatchResponse
|
||||
{
|
||||
Success = true,
|
||||
Message = ""
|
||||
};
|
||||
|
||||
// Return results in request order
|
||||
foreach (var tag in request.Tags)
|
||||
{
|
||||
if (results.TryGetValue(tag, out var vtq))
|
||||
{
|
||||
response.Vtqs.Add(ConvertToProtoVtq(tag, vtq));
|
||||
}
|
||||
else
|
||||
{
|
||||
response.Vtqs.Add(CreateBadVtq(tag, QualityCodeMapper.BadConfigurationError()));
|
||||
}
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "ReadBatch failed");
|
||||
return new Scada.ReadBatchResponse
|
||||
{
|
||||
Success = false,
|
||||
Message = ex.Message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// -- Write Operations -----------------------------------------
|
||||
|
||||
public override async Task<Scada.WriteResponse> Write(
|
||||
Scada.WriteRequest request, ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
return new Scada.WriteResponse { Success = false, Message = "Invalid session" };
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var value = TypedValueConverter.FromTypedValue(request.Value);
|
||||
await _scadaClient.WriteAsync(request.Tag, value!, context.CancellationToken);
|
||||
return new Scada.WriteResponse { Success = true, Message = "" };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Write failed for tag {Tag}", request.Tag);
|
||||
return new Scada.WriteResponse { Success = false, Message = ex.Message };
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task<Scada.WriteBatchResponse> WriteBatch(
|
||||
Scada.WriteBatchRequest request, ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
return new Scada.WriteBatchResponse { Success = false, Message = "Invalid session" };
|
||||
}
|
||||
|
||||
var response = new Scada.WriteBatchResponse { Success = true, Message = "" };
|
||||
|
||||
foreach (var item in request.Items)
|
||||
{
|
||||
try
|
||||
{
|
||||
var value = TypedValueConverter.FromTypedValue(item.Value);
|
||||
await _scadaClient.WriteAsync(item.Tag, value!, context.CancellationToken);
|
||||
response.Results.Add(new Scada.WriteResult
|
||||
{
|
||||
Tag = item.Tag, Success = true, Message = ""
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
response.Success = false;
|
||||
response.Results.Add(new Scada.WriteResult
|
||||
{
|
||||
Tag = item.Tag, Success = false, Message = ex.Message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
public override async Task<Scada.WriteBatchAndWaitResponse> WriteBatchAndWait(
|
||||
Scada.WriteBatchAndWaitRequest request, ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
return new Scada.WriteBatchAndWaitResponse { Success = false, Message = "Invalid session" };
|
||||
}
|
||||
|
||||
var response = new Scada.WriteBatchAndWaitResponse { Success = true };
|
||||
|
||||
try
|
||||
{
|
||||
// Execute writes and collect results
|
||||
foreach (var item in request.Items)
|
||||
{
|
||||
try
|
||||
{
|
||||
var value = TypedValueConverter.FromTypedValue(item.Value);
|
||||
await _scadaClient.WriteAsync(item.Tag, value!, context.CancellationToken);
|
||||
response.WriteResults.Add(new Scada.WriteResult
|
||||
{
|
||||
Tag = item.Tag, Success = true, Message = ""
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
response.Success = false;
|
||||
response.Message = "One or more writes failed";
|
||||
response.WriteResults.Add(new Scada.WriteResult
|
||||
{
|
||||
Tag = item.Tag, Success = false, Message = ex.Message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// If any write failed, return immediately
|
||||
if (!response.Success)
|
||||
return response;
|
||||
|
||||
// Poll flag tag
|
||||
var flagValue = TypedValueConverter.FromTypedValue(request.FlagValue);
|
||||
var timeoutMs = request.TimeoutMs > 0 ? request.TimeoutMs : 5000;
|
||||
var pollIntervalMs = request.PollIntervalMs > 0 ? request.PollIntervalMs : 100;
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
while (sw.ElapsedMilliseconds < timeoutMs)
|
||||
{
|
||||
context.CancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var vtq = await _scadaClient.ReadAsync(request.FlagTag, context.CancellationToken);
|
||||
if (vtq.Quality.IsGood() && TypedValueComparer.Equals(vtq.Value, flagValue))
|
||||
{
|
||||
response.FlagReached = true;
|
||||
response.ElapsedMs = (int)sw.ElapsedMilliseconds;
|
||||
return response;
|
||||
}
|
||||
|
||||
await Task.Delay(pollIntervalMs, context.CancellationToken);
|
||||
}
|
||||
|
||||
// Timeout -- not an error
|
||||
response.FlagReached = false;
|
||||
response.ElapsedMs = (int)sw.ElapsedMilliseconds;
|
||||
return response;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "WriteBatchAndWait failed");
|
||||
return new Scada.WriteBatchAndWaitResponse
|
||||
{
|
||||
Success = false, Message = ex.Message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// -- Subscription ---------------------------------------------
|
||||
|
||||
public override async Task Subscribe(
|
||||
Scada.SubscribeRequest request,
|
||||
IServerStreamWriter<Scada.VtqMessage> responseStream,
|
||||
ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid session"));
|
||||
}
|
||||
|
||||
var reader = _subscriptionManager.Subscribe(
|
||||
request.SessionId, request.Tags, context.CancellationToken);
|
||||
|
||||
try
|
||||
{
|
||||
while (await reader.WaitToReadAsync(context.CancellationToken))
|
||||
{
|
||||
while (reader.TryRead(out var item))
|
||||
{
|
||||
var protoVtq = ConvertToProtoVtq(item.address, item.vtq);
|
||||
await responseStream.WriteAsync(protoVtq);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Client disconnected -- normal
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Subscribe stream error for session {SessionId}", request.SessionId);
|
||||
throw new RpcException(new Status(StatusCode.Internal, ex.Message));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_subscriptionManager.UnsubscribeClient(request.SessionId);
|
||||
}
|
||||
}
|
||||
|
||||
// -- API Key Check --------------------------------------------
|
||||
|
||||
public override Task<Scada.CheckApiKeyResponse> CheckApiKey(
|
||||
Scada.CheckApiKeyRequest request, ServerCallContext context)
|
||||
{
|
||||
// The interceptor already validated the x-api-key header.
|
||||
// This RPC lets clients explicitly check a specific key.
|
||||
// The validated key from the interceptor is in context.UserState.
|
||||
var isValid = context.UserState.ContainsKey("ApiKey");
|
||||
return Task.FromResult(new Scada.CheckApiKeyResponse
|
||||
{
|
||||
IsValid = isValid,
|
||||
Message = isValid ? "Valid" : "Invalid"
|
||||
});
|
||||
}
|
||||
|
||||
// -- Helpers --------------------------------------------------
|
||||
|
||||
/// <summary>Converts a domain Vtq to a proto VtqMessage.</summary>
|
||||
private static Scada.VtqMessage ConvertToProtoVtq(string tag, Vtq vtq)
|
||||
{
|
||||
return new Scada.VtqMessage
|
||||
{
|
||||
Tag = tag,
|
||||
Value = TypedValueConverter.ToTypedValue(vtq.Value),
|
||||
TimestampUtcTicks = vtq.Timestamp.Ticks,
|
||||
Quality = QualityCodeMapper.ToQualityCode(vtq.Quality)
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>Creates a VtqMessage with bad quality for error responses.</summary>
|
||||
private static Scada.VtqMessage CreateBadVtq(string tag, Scada.QualityCode quality)
|
||||
{
|
||||
return new Scada.VtqMessage
|
||||
{
|
||||
Tag = tag,
|
||||
TimestampUtcTicks = DateTime.UtcNow.Ticks,
|
||||
Quality = quality
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user