using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Grpc.Net.Client; using Microsoft.Extensions.Logging; using ProtoBuf.Grpc.Client; using ZB.MOM.WW.LmxProxy.Client.Domain; using ZB.MOM.WW.LmxProxy.Client.Security; namespace ZB.MOM.WW.LmxProxy.Client { public partial class LmxProxyClient { /// /// Connects to the LmxProxy service and establishes a session /// /// Cancellation token. public async Task ConnectAsync(CancellationToken cancellationToken = default) { GrpcChannel? provisionalChannel = null; await _connectionLock.WaitAsync(cancellationToken); try { if (_disposed) { throw new ObjectDisposedException(nameof(LmxProxyClient)); } if (_isConnected && _client != null && !string.IsNullOrEmpty(_sessionId)) { _logger.LogDebug("LmxProxyClient already connected to {Host}:{Port} with session {SessionId}", _host, _port, _sessionId); return; } string securityMode = _tlsConfiguration?.UseTls == true ? "TLS/SSL" : "INSECURE"; _logger.LogInformation("Creating new {SecurityMode} connection to LmxProxy at {Host}:{Port}", securityMode, _host, _port); Uri endpoint = BuildEndpointUri(); provisionalChannel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger); // Create code-first gRPC client IScadaService provisionalClient = provisionalChannel.CreateGrpcService(); // Establish session with the server var connectRequest = new ConnectRequest { ClientId = $"ScadaBridge-{Guid.NewGuid():N}", ApiKey = _apiKey ?? string.Empty }; ConnectResponse connectResponse = await provisionalClient.ConnectAsync(connectRequest); if (!connectResponse.Success) { provisionalChannel.Dispose(); throw new InvalidOperationException($"Failed to establish session: {connectResponse.Message}"); } // Dispose any existing channel before replacing it _channel?.Dispose(); _channel = provisionalChannel; _client = provisionalClient; _sessionId = connectResponse.SessionId; _isConnected = true; provisionalChannel = null; StartKeepAlive(); _logger.LogInformation("Successfully connected to LmxProxy with session {SessionId}", _sessionId); } catch (Exception ex) { _isConnected = false; _client = null; _sessionId = string.Empty; _logger.LogError(ex, "Failed to connect to LmxProxy"); throw; } finally { provisionalChannel?.Dispose(); _connectionLock.Release(); } } private void StartKeepAlive() { StopKeepAlive(); _keepAliveTimer = new Timer(async _ => { try { if (_isConnected && _client != null && !string.IsNullOrEmpty(_sessionId)) { // Send a lightweight ping to keep session alive var request = new GetConnectionStateRequest { SessionId = _sessionId }; await _client.GetConnectionStateAsync(request); _logger.LogDebug("Keep-alive ping sent successfully for session {SessionId}", _sessionId); } } catch (Exception ex) { _logger.LogDebug(ex, "Keep-alive ping failed"); StopKeepAlive(); await MarkDisconnectedAsync(ex).ConfigureAwait(false); } }, null, _keepAliveInterval, _keepAliveInterval); } private void StopKeepAlive() { _keepAliveTimer?.Dispose(); _keepAliveTimer = null; } /// /// Disconnects from the LmxProxy service /// public async Task DisconnectAsync() { await _connectionLock.WaitAsync(); try { StopKeepAlive(); if (_client != null && !string.IsNullOrEmpty(_sessionId)) { try { var request = new DisconnectRequest { SessionId = _sessionId }; await _client.DisconnectAsync(request); _logger.LogInformation("Session {SessionId} disconnected", _sessionId); } catch (Exception ex) { _logger.LogWarning(ex, "Error during disconnect"); } } _client = null; _sessionId = string.Empty; _isConnected = false; _channel?.Dispose(); _channel = null; } finally { _connectionLock.Release(); } } /// /// Connects the LmxProxy to MxAccess (legacy method - session now established in ConnectAsync) /// /// Cancellation token. public Task<(bool Success, string? ErrorMessage)> ConnectToMxAccessAsync(CancellationToken cancellationToken = default) { // Session is now established in ConnectAsync if (IsConnected) return Task.FromResult((true, (string?)null)); return Task.FromResult<(bool Success, string? ErrorMessage)>((false, "Not connected. Call ConnectAsync first.")); } /// /// Disconnects the LmxProxy from MxAccess (legacy method) /// /// Cancellation token. public async Task<(bool Success, string? ErrorMessage)> DisconnectFromMxAccessAsync(CancellationToken cancellationToken = default) { try { await DisconnectAsync(); return (true, null); } catch (Exception ex) { return (false, ex.Message); } } /// /// Gets the connection state of the LmxProxy /// /// Cancellation token. public async Task<(bool IsConnected, string? ClientId)> GetConnectionStateAsync(CancellationToken cancellationToken = default) { EnsureConnected(); var request = new GetConnectionStateRequest { SessionId = _sessionId }; GetConnectionStateResponse response = await _client!.GetConnectionStateAsync(request); return (response.IsConnected, response.ClientId); } /// /// Builds the gRPC endpoint URI (http/https) based on TLS configuration. /// private Uri BuildEndpointUri() { string scheme = _tlsConfiguration?.UseTls == true ? Uri.UriSchemeHttps : Uri.UriSchemeHttp; return new UriBuilder { Scheme = scheme, Host = _host, Port = _port }.Uri; } private async Task MarkDisconnectedAsync(Exception? ex = null) { if (_disposed) return; await _connectionLock.WaitAsync().ConfigureAwait(false); try { _isConnected = false; _client = null; _sessionId = string.Empty; _channel?.Dispose(); _channel = null; } finally { _connectionLock.Release(); } List subsToDispose; lock (_subscriptionLock) { subsToDispose = new List(_activeSubscriptions); _activeSubscriptions.Clear(); } foreach (ISubscription sub in subsToDispose) { try { await sub.DisposeAsync().ConfigureAwait(false); } catch (Exception disposeEx) { _logger.LogWarning(disposeEx, "Error disposing subscription after disconnect"); } } if (ex != null) { _logger.LogWarning(ex, "Connection marked disconnected due to keep-alive failure"); } } } }