Files
scadalink-design/lmxproxy/src-reference/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.CodeFirstSubscription.cs
Joseph Doherty 0d63fb1105 feat(lmxproxy): phase 1 — v2 protocol types and domain model
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 23:41:56 -04:00

157 lines
5.8 KiB
C#

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client
{
public partial class LmxProxyClient
{
private class CodeFirstSubscription : ISubscription
{
private readonly IScadaService _client;
private readonly string _sessionId;
private readonly List<string> _tags;
private readonly Action<string, Vtq> _onUpdate;
private readonly ILogger<LmxProxyClient> _logger;
private readonly Action<ISubscription>? _onDispose;
private readonly CancellationTokenSource _cts = new();
private Task? _processingTask;
private bool _disposed;
/// <summary>
/// Initializes a new instance of the CodeFirstSubscription class.
/// </summary>
/// <param name="client">The gRPC ScadaService client.</param>
/// <param name="sessionId">The session identifier.</param>
/// <param name="tags">The list of tag addresses to subscribe to.</param>
/// <param name="onUpdate">Callback invoked when tag values change.</param>
/// <param name="logger">Logger for diagnostic information.</param>
/// <param name="onDispose">Optional callback invoked when the subscription is disposed.</param>
public CodeFirstSubscription(
IScadaService client,
string sessionId,
List<string> tags,
Action<string, Vtq> onUpdate,
ILogger<LmxProxyClient> logger,
Action<ISubscription>? onDispose = null)
{
_client = client;
_sessionId = sessionId;
_tags = tags;
_onUpdate = onUpdate;
_logger = logger;
_onDispose = onDispose;
}
/// <summary>
/// Starts the subscription asynchronously and begins processing tag value updates.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task that completes when the subscription processing has started.</returns>
public Task StartAsync(CancellationToken cancellationToken = default)
{
_processingTask = ProcessUpdatesAsync(cancellationToken);
return Task.CompletedTask;
}
private async Task ProcessUpdatesAsync(CancellationToken cancellationToken)
{
try
{
var request = new SubscribeRequest
{
SessionId = _sessionId,
Tags = _tags,
SamplingMs = 1000
};
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
await foreach (VtqMessage vtq in _client.SubscribeAsync(request, linkedCts.Token))
{
try
{
Vtq convertedVtq = ConvertToVtq(vtq.Tag, vtq);
_onUpdate(vtq.Tag, convertedVtq);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing subscription update for {Tag}", vtq.Tag);
}
}
}
catch (OperationCanceledException) when (_cts.Token.IsCancellationRequested || cancellationToken.IsCancellationRequested)
{
_logger.LogDebug("Subscription cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in subscription processing");
try { await _cts.CancelAsync(); } catch { /* ignore */ }
}
finally
{
if (!_disposed)
{
_disposed = true;
_onDispose?.Invoke(this);
}
}
}
/// <summary>
/// Asynchronously disposes the subscription and stops processing tag updates.
/// </summary>
/// <returns>A task representing the asynchronous disposal operation.</returns>
public async Task DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await _cts.CancelAsync();
try
{
if (_processingTask != null)
{
await _processingTask;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error disposing subscription");
}
finally
{
_cts.Dispose();
_onDispose?.Invoke(this);
}
}
/// <summary>
/// Synchronously disposes the subscription and stops processing tag updates.
/// </summary>
public void Dispose()
{
if (_disposed) return;
try
{
Task task = DisposeAsync();
if (!task.Wait(TimeSpan.FromSeconds(5)))
{
_logger.LogWarning("Subscription disposal timed out");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during synchronous disposal");
}
}
}
}
}