157 lines
5.8 KiB
C#
157 lines
5.8 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.LmxProxy.Client.Domain;
|
|
|
|
namespace ZB.MOM.WW.LmxProxy.Client
|
|
{
|
|
public partial class LmxProxyClient
|
|
{
|
|
private class CodeFirstSubscription : ISubscription
|
|
{
|
|
private readonly IScadaService _client;
|
|
private readonly string _sessionId;
|
|
private readonly List<string> _tags;
|
|
private readonly Action<string, Vtq> _onUpdate;
|
|
private readonly ILogger<LmxProxyClient> _logger;
|
|
private readonly Action<ISubscription>? _onDispose;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private Task? _processingTask;
|
|
private bool _disposed;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the CodeFirstSubscription class.
|
|
/// </summary>
|
|
/// <param name="client">The gRPC ScadaService client.</param>
|
|
/// <param name="sessionId">The session identifier.</param>
|
|
/// <param name="tags">The list of tag addresses to subscribe to.</param>
|
|
/// <param name="onUpdate">Callback invoked when tag values change.</param>
|
|
/// <param name="logger">Logger for diagnostic information.</param>
|
|
/// <param name="onDispose">Optional callback invoked when the subscription is disposed.</param>
|
|
public CodeFirstSubscription(
|
|
IScadaService client,
|
|
string sessionId,
|
|
List<string> tags,
|
|
Action<string, Vtq> onUpdate,
|
|
ILogger<LmxProxyClient> logger,
|
|
Action<ISubscription>? onDispose = null)
|
|
{
|
|
_client = client;
|
|
_sessionId = sessionId;
|
|
_tags = tags;
|
|
_onUpdate = onUpdate;
|
|
_logger = logger;
|
|
_onDispose = onDispose;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Starts the subscription asynchronously and begins processing tag value updates.
|
|
/// </summary>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
/// <returns>A task that completes when the subscription processing has started.</returns>
|
|
public Task StartAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
_processingTask = ProcessUpdatesAsync(cancellationToken);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private async Task ProcessUpdatesAsync(CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
var request = new SubscribeRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Tags = _tags,
|
|
SamplingMs = 1000
|
|
};
|
|
|
|
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
|
|
|
|
await foreach (VtqMessage vtq in _client.SubscribeAsync(request, linkedCts.Token))
|
|
{
|
|
try
|
|
{
|
|
Vtq convertedVtq = ConvertToVtq(vtq.Tag, vtq);
|
|
_onUpdate(vtq.Tag, convertedVtq);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing subscription update for {Tag}", vtq.Tag);
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (_cts.Token.IsCancellationRequested || cancellationToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogDebug("Subscription cancelled");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error in subscription processing");
|
|
try { await _cts.CancelAsync(); } catch { /* ignore */ }
|
|
}
|
|
finally
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
_disposed = true;
|
|
_onDispose?.Invoke(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Asynchronously disposes the subscription and stops processing tag updates.
|
|
/// </summary>
|
|
/// <returns>A task representing the asynchronous disposal operation.</returns>
|
|
public async Task DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
|
|
await _cts.CancelAsync();
|
|
|
|
try
|
|
{
|
|
if (_processingTask != null)
|
|
{
|
|
await _processingTask;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error disposing subscription");
|
|
}
|
|
finally
|
|
{
|
|
_cts.Dispose();
|
|
_onDispose?.Invoke(this);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Synchronously disposes the subscription and stops processing tag updates.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
|
|
try
|
|
{
|
|
Task task = DisposeAsync();
|
|
if (!task.Wait(TimeSpan.FromSeconds(5)))
|
|
{
|
|
_logger.LogWarning("Subscription disposal timed out");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during synchronous disposal");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|