using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using ZB.MOM.WW.LmxProxy.Client.Domain; namespace ZB.MOM.WW.LmxProxy.Client { public partial class LmxProxyClient { private class CodeFirstSubscription : ISubscription { private readonly IScadaService _client; private readonly string _sessionId; private readonly List _tags; private readonly Action _onUpdate; private readonly ILogger _logger; private readonly Action? _onDispose; private readonly CancellationTokenSource _cts = new(); private Task? _processingTask; private bool _disposed; /// /// Initializes a new instance of the CodeFirstSubscription class. /// /// The gRPC ScadaService client. /// The session identifier. /// The list of tag addresses to subscribe to. /// Callback invoked when tag values change. /// Logger for diagnostic information. /// Optional callback invoked when the subscription is disposed. public CodeFirstSubscription( IScadaService client, string sessionId, List tags, Action onUpdate, ILogger logger, Action? onDispose = null) { _client = client; _sessionId = sessionId; _tags = tags; _onUpdate = onUpdate; _logger = logger; _onDispose = onDispose; } /// /// Starts the subscription asynchronously and begins processing tag value updates. /// /// Cancellation token. /// A task that completes when the subscription processing has started. public Task StartAsync(CancellationToken cancellationToken = default) { _processingTask = ProcessUpdatesAsync(cancellationToken); return Task.CompletedTask; } private async Task ProcessUpdatesAsync(CancellationToken cancellationToken) { try { var request = new SubscribeRequest { SessionId = _sessionId, Tags = _tags, SamplingMs = 1000 }; using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token); await foreach (VtqMessage vtq in _client.SubscribeAsync(request, linkedCts.Token)) { try { Vtq convertedVtq = ConvertToVtq(vtq.Tag, vtq); _onUpdate(vtq.Tag, convertedVtq); } catch (Exception ex) { _logger.LogError(ex, "Error processing subscription update for {Tag}", vtq.Tag); } } } catch (OperationCanceledException) when (_cts.Token.IsCancellationRequested || cancellationToken.IsCancellationRequested) { _logger.LogDebug("Subscription cancelled"); } catch (Exception ex) { _logger.LogError(ex, "Error in subscription processing"); try { await _cts.CancelAsync(); } catch { /* ignore */ } } finally { if (!_disposed) { _disposed = true; _onDispose?.Invoke(this); } } } /// /// Asynchronously disposes the subscription and stops processing tag updates. /// /// A task representing the asynchronous disposal operation. public async Task DisposeAsync() { if (_disposed) return; _disposed = true; await _cts.CancelAsync(); try { if (_processingTask != null) { await _processingTask; } } catch (Exception ex) { _logger.LogError(ex, "Error disposing subscription"); } finally { _cts.Dispose(); _onDispose?.Invoke(this); } } /// /// Synchronously disposes the subscription and stops processing tag updates. /// public void Dispose() { if (_disposed) return; try { Task task = DisposeAsync(); if (!task.Wait(TimeSpan.FromSeconds(5))) { _logger.LogWarning("Subscription disposal timed out"); } } catch (Exception ex) { _logger.LogError(ex, "Error during synchronous disposal"); } } } } }