using System.Net.Http; using Grpc.Core; using Grpc.Net.Client; using ScadaLink.DataConnectionLayer.Adapters.LmxProxy.Grpc; namespace ScadaLink.DataConnectionLayer.Adapters; /// /// Production ILmxProxyClient that talks to the LmxProxy gRPC service /// using proto-generated client stubs with x-api-key header injection. /// internal class RealLmxProxyClient : ILmxProxyClient { private readonly string _host; private readonly int _port; private readonly string? _apiKey; private readonly int _samplingIntervalMs; private readonly bool _useTls; private GrpcChannel? _channel; private ScadaService.ScadaServiceClient? _client; private string? _sessionId; private Metadata? _headers; public RealLmxProxyClient(string host, int port, string? apiKey, int samplingIntervalMs = 0, bool useTls = false) { _host = host; _port = port; _apiKey = apiKey; _samplingIntervalMs = samplingIntervalMs; _useTls = useTls; } public bool IsConnected => _client != null && !string.IsNullOrEmpty(_sessionId); public async Task ConnectAsync(CancellationToken cancellationToken = default) { if (!_useTls) AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); var scheme = _useTls ? "https" : "http"; _channel = GrpcChannel.ForAddress($"{scheme}://{_host}:{_port}"); _client = new ScadaService.ScadaServiceClient(_channel); _headers = new Metadata(); if (!string.IsNullOrEmpty(_apiKey)) _headers.Add("x-api-key", _apiKey); var response = await _client.ConnectAsync(new ConnectRequest { ClientId = $"ScadaLink-{Guid.NewGuid():N}", ApiKey = _apiKey ?? string.Empty }, _headers, cancellationToken: cancellationToken); if (!response.Success) throw new InvalidOperationException($"LmxProxy connect failed: {response.Message}"); _sessionId = response.SessionId; } public async Task DisconnectAsync() { if (_client != null && !string.IsNullOrEmpty(_sessionId)) { try { await _client.DisconnectAsync(new DisconnectRequest { SessionId = _sessionId }, _headers); } catch { /* best-effort */ } } _client = null; _sessionId = null; } public async Task ReadAsync(string address, CancellationToken cancellationToken = default) { EnsureConnected(); var response = await _client!.ReadAsync( new ReadRequest { SessionId = _sessionId!, Tag = address }, _headers, cancellationToken: cancellationToken); if (!response.Success) throw new InvalidOperationException($"Read failed for '{address}': {response.Message}"); return ConvertVtq(response.Vtq); } public async Task> ReadBatchAsync(IEnumerable addresses, CancellationToken cancellationToken = default) { EnsureConnected(); var request = new ReadBatchRequest { SessionId = _sessionId! }; request.Tags.AddRange(addresses); var response = await _client!.ReadBatchAsync(request, _headers, cancellationToken: cancellationToken); if (!response.Success) throw new InvalidOperationException($"ReadBatch failed: {response.Message}"); return response.Vtqs.ToDictionary(v => v.Tag, v => ConvertVtq(v)); } public async Task WriteAsync(string address, object value, CancellationToken cancellationToken = default) { EnsureConnected(); var response = await _client!.WriteAsync(new WriteRequest { SessionId = _sessionId!, Tag = address, Value = value?.ToString() ?? string.Empty }, _headers, cancellationToken: cancellationToken); if (!response.Success) throw new InvalidOperationException($"Write failed for '{address}': {response.Message}"); } public async Task WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default) { EnsureConnected(); var request = new WriteBatchRequest { SessionId = _sessionId! }; request.Items.AddRange(values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value?.ToString() ?? string.Empty })); var response = await _client!.WriteBatchAsync(request, _headers, cancellationToken: cancellationToken); if (!response.Success) throw new InvalidOperationException($"WriteBatch failed: {response.Message}"); } public Task SubscribeAsync(IEnumerable addresses, Action onUpdate, Action? onStreamError = null, CancellationToken cancellationToken = default) { EnsureConnected(); var tags = addresses.ToList(); var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var request = new SubscribeRequest { SessionId = _sessionId!, SamplingMs = _samplingIntervalMs }; request.Tags.AddRange(tags); var call = _client!.Subscribe(request, _headers, cancellationToken: cts.Token); _ = Task.Run(async () => { try { while (await call.ResponseStream.MoveNext(cts.Token)) { var msg = call.ResponseStream.Current; onUpdate(msg.Tag, ConvertVtq(msg)); } // Stream ended normally (server closed) — treat as disconnect _sessionId = null; onStreamError?.Invoke(); } catch (OperationCanceledException) { } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { } catch (RpcException) { // gRPC error (server offline, network failure) — signal disconnect _sessionId = null; onStreamError?.Invoke(); } }, cts.Token); return Task.FromResult(new CtsSubscription(cts)); } public async ValueTask DisposeAsync() { await DisconnectAsync(); _channel?.Dispose(); _channel = null; } private void EnsureConnected() { if (_client == null || string.IsNullOrEmpty(_sessionId)) throw new InvalidOperationException("LmxProxy client is not connected."); } private static LmxVtq ConvertVtq(VtqMessage? msg) { if (msg == null) return new LmxVtq(null, DateTime.UtcNow, LmxQuality.Bad); object? value = msg.Value; if (!string.IsNullOrEmpty(msg.Value)) { if (double.TryParse(msg.Value, out var d)) value = d; else if (bool.TryParse(msg.Value, out var b)) value = b; else value = msg.Value; } var timestamp = new DateTime(msg.TimestampUtcTicks, DateTimeKind.Utc); var quality = msg.Quality?.ToUpperInvariant() switch { "GOOD" => LmxQuality.Good, "UNCERTAIN" => LmxQuality.Uncertain, _ => LmxQuality.Bad }; return new LmxVtq(value, timestamp, quality); } private sealed class CtsSubscription(CancellationTokenSource cts) : ILmxSubscription { public ValueTask DisposeAsync() { cts.Cancel(); cts.Dispose(); return ValueTask.CompletedTask; } } } /// /// Production factory that creates real LmxProxy gRPC clients. /// public class RealLmxProxyClientFactory : ILmxProxyClientFactory { public ILmxProxyClient Create(string host, int port, string? apiKey, int samplingIntervalMs = 0, bool useTls = false) => new RealLmxProxyClient(host, port, apiKey, samplingIntervalMs, useTls); }