using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Grpc.Core;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
using ZB.MOM.WW.LmxProxy.Host.Security;
using ZB.MOM.WW.LmxProxy.Host.Services;
using ZB.MOM.WW.LmxProxy.Host.Grpc;
namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
{
///
/// gRPC service implementation for SCADA operations.
/// Provides methods for connecting, reading, writing, batch operations, and subscriptions.
///
public class ScadaGrpcService : ScadaService.ScadaServiceBase
{
private static readonly ILogger Logger = Log.ForContext();
private readonly PerformanceMetrics _performanceMetrics;
private readonly IScadaClient _scadaClient;
private readonly SessionManager _sessionManager;
private readonly SubscriptionManager _subscriptionManager;
///
/// Initializes a new instance of the class.
///
/// The SCADA client instance.
/// The subscription manager instance.
/// The session manager instance.
/// Optional performance metrics service for tracking operations.
/// Thrown if any required argument is null.
public ScadaGrpcService(
IScadaClient scadaClient,
SubscriptionManager subscriptionManager,
SessionManager sessionManager,
PerformanceMetrics performanceMetrics = null)
{
_scadaClient = scadaClient ?? throw new ArgumentNullException(nameof(scadaClient));
_subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager));
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
_performanceMetrics = performanceMetrics;
}
#region Connection Management
///
/// Creates a new session for a client.
/// The MxAccess connection is managed separately at server startup.
///
/// The connection request with client ID and API key.
/// The gRPC server call context.
/// A with session ID.
public override Task Connect(ConnectRequest request, ServerCallContext context)
{
try
{
Logger.Information("Connect request from {Peer} - ClientId: {ClientId}",
context.Peer, request.ClientId);
// Validate that MxAccess is connected
if (!_scadaClient.IsConnected)
{
return Task.FromResult(new ConnectResponse
{
Success = false,
Message = "SCADA server is not connected to MxAccess",
SessionId = string.Empty
});
}
// Create a new session
var sessionId = _sessionManager.CreateSession(request.ClientId, request.ApiKey);
return Task.FromResult(new ConnectResponse
{
Success = true,
Message = "Session created successfully",
SessionId = sessionId
});
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to create session for client {ClientId}", request.ClientId);
return Task.FromResult(new ConnectResponse
{
Success = false,
Message = ex.Message,
SessionId = string.Empty
});
}
}
///
/// Terminates a client session.
///
/// The disconnect request with session ID.
/// The gRPC server call context.
/// A indicating success or failure.
public override Task Disconnect(DisconnectRequest request, ServerCallContext context)
{
try
{
Logger.Information("Disconnect request from {Peer} - SessionId: {SessionId}",
context.Peer, request.SessionId);
var terminated = _sessionManager.TerminateSession(request.SessionId);
return Task.FromResult(new DisconnectResponse
{
Success = terminated,
Message = terminated ? "Session terminated successfully" : "Session not found"
});
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to disconnect session {SessionId}", request.SessionId);
return Task.FromResult(new DisconnectResponse
{
Success = false,
Message = ex.Message
});
}
}
///
/// Gets the connection state for a session.
///
/// The connection state request with session ID.
/// The gRPC server call context.
/// A with connection details.
public override Task GetConnectionState(GetConnectionStateRequest request,
ServerCallContext context)
{
var session = _sessionManager.GetSession(request.SessionId);
if (session == null)
{
return Task.FromResult(new GetConnectionStateResponse
{
IsConnected = false,
ClientId = string.Empty,
ConnectedSinceUtcTicks = 0
});
}
return Task.FromResult(new GetConnectionStateResponse
{
IsConnected = _scadaClient.IsConnected,
ClientId = session.ClientId,
ConnectedSinceUtcTicks = session.ConnectedSinceUtcTicks
});
}
#endregion
#region Read Operations
///
/// Reads a single tag value from the SCADA system.
///
/// The read request with session ID and tag.
/// The gRPC server call context.
/// A with the VTQ data.
public override async Task Read(ReadRequest request, ServerCallContext context)
{
using (PerformanceMetrics.ITimingScope scope = _performanceMetrics?.BeginOperation("Read"))
{
try
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
return new ReadResponse
{
Success = false,
Message = "Invalid session ID",
Vtq = CreateBadVtqMessage(request.Tag)
};
}
Logger.Debug("Read request from {Peer} for {Tag}", context.Peer, request.Tag);
Vtq vtq = await _scadaClient.ReadAsync(request.Tag, context.CancellationToken);
scope?.SetSuccess(true);
return new ReadResponse
{
Success = true,
Message = string.Empty,
Vtq = ConvertToVtqMessage(request.Tag, vtq)
};
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to read {Tag}", request.Tag);
scope?.SetSuccess(false);
return new ReadResponse
{
Success = false,
Message = ex.Message,
Vtq = CreateBadVtqMessage(request.Tag)
};
}
}
}
///
/// Reads multiple tag values from the SCADA system.
///
/// The batch read request with session ID and tags.
/// The gRPC server call context.
/// A with VTQ data for each tag.
public override async Task ReadBatch(ReadBatchRequest request, ServerCallContext context)
{
using (PerformanceMetrics.ITimingScope scope = _performanceMetrics?.BeginOperation("ReadBatch"))
{
try
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
var badResponse = new ReadBatchResponse
{
Success = false,
Message = "Invalid session ID"
};
foreach (var tag in request.Tags)
{
badResponse.Vtqs.Add(CreateBadVtqMessage(tag));
}
return badResponse;
}
Logger.Debug("ReadBatch request from {Peer} for {Count} tags", context.Peer, request.Tags.Count);
IReadOnlyDictionary results =
await _scadaClient.ReadBatchAsync(request.Tags, context.CancellationToken);
var response = new ReadBatchResponse
{
Success = true,
Message = string.Empty
};
// Return results in the same order as the request tags
foreach (var tag in request.Tags)
{
if (results.TryGetValue(tag, out Vtq vtq))
{
response.Vtqs.Add(ConvertToVtqMessage(tag, vtq));
}
else
{
response.Vtqs.Add(CreateBadVtqMessage(tag));
}
}
scope?.SetSuccess(true);
return response;
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to read batch");
scope?.SetSuccess(false);
var response = new ReadBatchResponse
{
Success = false,
Message = ex.Message
};
foreach (var tag in request.Tags)
{
response.Vtqs.Add(CreateBadVtqMessage(tag));
}
return response;
}
}
}
#endregion
#region Write Operations
///
/// Writes a single tag value to the SCADA system.
///
/// The write request with session ID, tag, and value.
/// The gRPC server call context.
/// A indicating success or failure.
public override async Task Write(WriteRequest request, ServerCallContext context)
{
using (PerformanceMetrics.ITimingScope scope = _performanceMetrics?.BeginOperation("Write"))
{
try
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
return new WriteResponse
{
Success = false,
Message = "Invalid session ID"
};
}
Logger.Debug("Write request from {Peer} for {Tag}", context.Peer, request.Tag);
// Parse the string value to an appropriate type
var value = ParseValue(request.Value);
await _scadaClient.WriteAsync(request.Tag, value, context.CancellationToken);
scope?.SetSuccess(true);
return new WriteResponse
{
Success = true,
Message = string.Empty
};
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to write to {Tag}", request.Tag);
scope?.SetSuccess(false);
return new WriteResponse
{
Success = false,
Message = ex.Message
};
}
}
}
///
/// Writes multiple tag values to the SCADA system.
///
/// The batch write request with session ID and items.
/// The gRPC server call context.
/// A with results for each tag.
public override async Task WriteBatch(WriteBatchRequest request, ServerCallContext context)
{
using (PerformanceMetrics.ITimingScope scope = _performanceMetrics?.BeginOperation("WriteBatch"))
{
try
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
var badResponse = new WriteBatchResponse
{
Success = false,
Message = "Invalid session ID"
};
foreach (var item in request.Items)
{
badResponse.Results.Add(new WriteResult
{
Tag = item.Tag,
Success = false,
Message = "Invalid session ID"
});
}
return badResponse;
}
Logger.Debug("WriteBatch request from {Peer} for {Count} items", context.Peer, request.Items.Count);
var values = new Dictionary();
foreach (var item in request.Items)
{
values[item.Tag] = ParseValue(item.Value);
}
await _scadaClient.WriteBatchAsync(values, context.CancellationToken);
scope?.SetSuccess(true);
var response = new WriteBatchResponse
{
Success = true,
Message = string.Empty
};
foreach (var item in request.Items)
{
response.Results.Add(new WriteResult
{
Tag = item.Tag,
Success = true,
Message = string.Empty
});
}
return response;
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to write batch");
scope?.SetSuccess(false);
var response = new WriteBatchResponse
{
Success = false,
Message = ex.Message
};
foreach (var item in request.Items)
{
response.Results.Add(new WriteResult
{
Tag = item.Tag,
Success = false,
Message = ex.Message
});
}
return response;
}
}
}
///
/// Writes a batch of tag values and waits for a flag tag to reach a specific value.
///
/// The batch write and wait request.
/// The gRPC server call context.
/// A with results and flag status.
public override async Task WriteBatchAndWait(WriteBatchAndWaitRequest request,
ServerCallContext context)
{
var startTime = DateTime.UtcNow;
try
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
var badResponse = new WriteBatchAndWaitResponse
{
Success = false,
Message = "Invalid session ID",
FlagReached = false,
ElapsedMs = 0
};
foreach (var item in request.Items)
{
badResponse.WriteResults.Add(new WriteResult
{
Tag = item.Tag,
Success = false,
Message = "Invalid session ID"
});
}
return badResponse;
}
Logger.Debug("WriteBatchAndWait request from {Peer}", context.Peer);
var values = new Dictionary();
foreach (var item in request.Items)
{
values[item.Tag] = ParseValue(item.Value);
}
var flagValue = ParseValue(request.FlagValue);
var pollInterval = request.PollIntervalMs > 0 ? request.PollIntervalMs : 100;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(request.TimeoutMs));
// Write the batch first
await _scadaClient.WriteBatchAsync(values, cts.Token);
// Poll for the flag value
var flagReached = false;
while (!cts.Token.IsCancellationRequested)
{
try
{
var flagVtq = await _scadaClient.ReadAsync(request.FlagTag, cts.Token);
if (flagVtq.Value != null && AreValuesEqual(flagVtq.Value, flagValue))
{
flagReached = true;
break;
}
await Task.Delay(pollInterval, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
}
var elapsedMs = (int)(DateTime.UtcNow - startTime).TotalMilliseconds;
var response = new WriteBatchAndWaitResponse
{
Success = true,
Message = string.Empty,
FlagReached = flagReached,
ElapsedMs = elapsedMs
};
foreach (var item in request.Items)
{
response.WriteResults.Add(new WriteResult
{
Tag = item.Tag,
Success = true,
Message = string.Empty
});
}
return response;
}
catch (Exception ex)
{
Logger.Error(ex, "Failed to write batch and wait");
var elapsedMs = (int)(DateTime.UtcNow - startTime).TotalMilliseconds;
var response = new WriteBatchAndWaitResponse
{
Success = false,
Message = ex.Message,
FlagReached = false,
ElapsedMs = elapsedMs
};
foreach (var item in request.Items)
{
response.WriteResults.Add(new WriteResult
{
Tag = item.Tag,
Success = false,
Message = ex.Message
});
}
return response;
}
}
#endregion
#region Subscription Operations
///
/// Subscribes to value changes for specified tags and streams updates to the client.
///
/// The subscribe request with session ID and tags.
/// The server stream writer for VTQ updates.
/// The gRPC server call context.
/// A task representing the asynchronous operation.
public override async Task Subscribe(SubscribeRequest request,
IServerStreamWriter responseStream, ServerCallContext context)
{
// Validate session
if (!_sessionManager.ValidateSession(request.SessionId))
{
Logger.Warning("Subscribe failed: Invalid session ID {SessionId}", request.SessionId);
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid session ID"));
}
var clientId = Guid.NewGuid().ToString();
try
{
Logger.Information("Subscribe request from {Peer} with client ID {ClientId} for {Count} tags",
context.Peer, clientId, request.Tags.Count);
Channel<(string address, Vtq vtq)> channel = await _subscriptionManager.SubscribeAsync(
clientId,
request.Tags,
context.CancellationToken);
// Stream updates to the client until cancelled
while (!context.CancellationToken.IsCancellationRequested)
{
try
{
while (await channel.Reader.WaitToReadAsync(context.CancellationToken))
{
if (channel.Reader.TryRead(out (string address, Vtq vtq) item))
{
var vtqMessage = ConvertToVtqMessage(item.address, item.vtq);
await responseStream.WriteAsync(vtqMessage);
}
}
}
catch (OperationCanceledException)
{
break;
}
}
}
catch (OperationCanceledException)
{
Logger.Information("Subscription cancelled for client {ClientId}", clientId);
}
catch (Exception ex)
{
Logger.Error(ex, "Error in subscription for client {ClientId}", clientId);
throw;
}
finally
{
_subscriptionManager.UnsubscribeClient(clientId);
}
}
#endregion
#region Authentication
///
/// Checks the validity of an API key.
///
/// The API key check request.
/// The gRPC server call context.
/// A with validity and details.
public override Task CheckApiKey(CheckApiKeyRequest request, ServerCallContext context)
{
var response = new CheckApiKeyResponse
{
IsValid = false,
Message = "API key validation failed"
};
// Check if API key was validated by interceptor
if (context.UserState.TryGetValue("ApiKey", out object apiKeyObj) && apiKeyObj is ApiKey apiKey)
{
response.IsValid = apiKey.IsValid();
response.Message = apiKey.IsValid()
? $"API key is valid (Role: {apiKey.Role})"
: "API key is disabled";
Logger.Information("API key check - Valid: {IsValid}, Role: {Role}",
response.IsValid, apiKey.Role);
}
else
{
Logger.Warning("API key check failed - no API key in context");
}
return Task.FromResult(response);
}
#endregion
#region Value Conversion Helpers
///
/// Converts a domain to a gRPC .
///
private static VtqMessage ConvertToVtqMessage(string tag, Vtq vtq)
{
return new VtqMessage
{
Tag = tag,
Value = ConvertValueToString(vtq.Value),
TimestampUtcTicks = vtq.Timestamp.Ticks,
Quality = ConvertQualityToString(vtq.Quality)
};
}
///
/// Creates a bad quality VTQ message for error cases.
///
private static VtqMessage CreateBadVtqMessage(string tag)
{
return new VtqMessage
{
Tag = tag,
Value = string.Empty,
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = "Bad"
};
}
///
/// Converts a value to its string representation.
///
private static string ConvertValueToString(object value)
{
if (value == null)
{
return string.Empty;
}
return value switch
{
bool b => b.ToString().ToLowerInvariant(),
DateTime dt => dt.ToUniversalTime().ToString("O"),
DateTimeOffset dto => dto.ToString("O"),
float f => f.ToString(CultureInfo.InvariantCulture),
double d => d.ToString(CultureInfo.InvariantCulture),
decimal dec => dec.ToString(CultureInfo.InvariantCulture),
Array => JsonSerializer.Serialize(value, value.GetType()),
_ => value.ToString() ?? string.Empty
};
}
///
/// Converts a domain quality value to a string.
///
private static string ConvertQualityToString(Domain.Quality quality)
{
// Simplified quality mapping for the new API
var qualityValue = (int)quality;
if (qualityValue >= 192) // Good family
{
return "Good";
}
if (qualityValue >= 64) // Uncertain family
{
return "Uncertain";
}
return "Bad"; // Bad family
}
///
/// Parses a string value to an appropriate .NET type.
///
private static object ParseValue(string value)
{
if (string.IsNullOrEmpty(value))
{
return string.Empty;
}
// Try to parse as boolean
if (bool.TryParse(value, out bool boolResult))
{
return boolResult;
}
// Try to parse as integer
if (int.TryParse(value, NumberStyles.Integer, CultureInfo.InvariantCulture, out int intResult))
{
return intResult;
}
// Try to parse as long
if (long.TryParse(value, NumberStyles.Integer, CultureInfo.InvariantCulture, out long longResult))
{
return longResult;
}
// Try to parse as double
if (double.TryParse(value, NumberStyles.Float | NumberStyles.AllowThousands, CultureInfo.InvariantCulture,
out double doubleResult))
{
return doubleResult;
}
// Try to parse as DateTime
if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind,
out DateTime dateResult))
{
return dateResult;
}
// Return as string
return value;
}
///
/// Compares two values for equality.
///
private static bool AreValuesEqual(object value1, object value2)
{
if (value1 == null && value2 == null)
{
return true;
}
if (value1 == null || value2 == null)
{
return false;
}
// Convert both to strings for comparison
var str1 = ConvertValueToString(value1);
var str2 = ConvertValueToString(value2);
return string.Equals(str1, str2, StringComparison.OrdinalIgnoreCase);
}
#endregion
}
}