using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Configuration;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.Implementation
{
///
/// Implementation of using ArchestrA MxAccess.
/// Provides connection management, read/write operations, and subscription support for SCADA tags.
///
public sealed partial class MxAccessClient : IScadaClient
{
private const int DefaultMaxConcurrency = 10;
private static readonly ILogger Logger = Log.ForContext();
private readonly ConnectionConfiguration _configuration;
private readonly object _lock = new();
private readonly Dictionary _pendingWrites = new();
// Concurrency control for batch operations
private readonly SemaphoreSlim _readSemaphore;
// Store subscription details for automatic recreation after reconnect
private readonly List _storedSubscriptions = new();
private readonly Dictionary _subscriptions = new();
private readonly Dictionary _subscriptionsByHandle = new();
private readonly SemaphoreSlim _writeSemaphore;
private int _connectionHandle;
private ConnectionState _connectionState = ConnectionState.Disconnected;
private bool _disposed;
private LMXProxyServer? _lmxProxy;
///
/// Initializes a new instance of the class.
///
/// The connection configuration settings.
public MxAccessClient(ConnectionConfiguration configuration)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
// Initialize semaphores with configurable concurrency limits
int maxConcurrency = _configuration.MaxConcurrentOperations ?? DefaultMaxConcurrency;
_readSemaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
_writeSemaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
}
///
public bool IsConnected
{
get
{
lock (_lock)
{
return _lmxProxy != null && _connectionState == ConnectionState.Connected && _connectionHandle > 0;
}
}
}
///
public ConnectionState ConnectionState
{
get
{
lock (_lock)
{
return _connectionState;
}
}
}
///
/// Occurs when the connection state changes.
///
public event EventHandler? ConnectionStateChanged;
///
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
await DisconnectAsync();
_disposed = true;
// Dispose semaphores
_readSemaphore?.Dispose();
_writeSemaphore?.Dispose();
}
///
public void Dispose() => DisposeAsync().GetAwaiter().GetResult();
///
/// Sets the connection state and raises the event.
///
/// The new connection state.
/// Optional message describing the state change.
private void SetConnectionState(ConnectionState newState, string? message = null)
{
ConnectionState previousState = _connectionState;
if (previousState == newState)
{
return;
}
_connectionState = newState;
Logger.Information("Connection state changed from {Previous} to {Current}", previousState, newState);
ConnectionStateChanged?.Invoke(this, new ConnectionStateChangedEventArgs(previousState, newState, message));
}
///
/// Removes a stored subscription group by its ID.
///
/// The group identifier to remove.
private void RemoveStoredSubscription(string groupId)
{
lock (_lock)
{
_storedSubscriptions.RemoveAll(s => s.GroupId == groupId);
Logger.Debug("Removed stored subscription group {GroupId}", groupId);
}
}
#pragma warning disable CS0169 // Field is never used - reserved for future functionality
private string? _currentNodeName;
private string? _currentGalaxyName;
#pragma warning restore CS0169
}
}