using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Polly;
using ZB.MOM.WW.LmxProxy.Host.Domain;
using ZB.MOM.WW.LmxProxy.Host.Services;
namespace ZB.MOM.WW.LmxProxy.Host.Implementation
{
///
/// Read and write operations for MxAccessClient.
///
public sealed partial class MxAccessClient
{
///
public async Task ReadAsync(string address, CancellationToken ct = default)
{
// Apply retry policy for read operations
IAsyncPolicy policy = RetryPolicies.CreateReadPolicy();
return await policy.ExecuteWithRetryAsync(async () =>
{
ValidateConnection();
return await ReadSingleValueAsync(address, ct);
}, $"Read-{address}");
}
///
public async Task> ReadBatchAsync(IEnumerable addresses,
CancellationToken ct = default)
{
var addressList = addresses.ToList();
var results = new Dictionary(addressList.Count);
// Create tasks for parallel reading
IEnumerable tasks =
addressList.Select(address => ReadAddressWithSemaphoreAsync(address, results, ct));
await Task.WhenAll(tasks);
return results;
}
///
public async Task WriteAsync(string address, object value, CancellationToken ct = default)
{
// Apply retry policy for write operations
IAsyncPolicy policy = RetryPolicies.CreateWritePolicy();
await policy.ExecuteWithRetryAsync(async () => { await WriteInternalAsync(address, value, ct); },
$"Write-{address}");
}
///
public async Task WriteBatchAsync(IReadOnlyDictionary values, CancellationToken ct = default)
{
// Create tasks for parallel writing
IEnumerable tasks = values.Select(kvp => WriteAddressWithSemaphoreAsync(kvp.Key, kvp.Value, ct));
await Task.WhenAll(tasks);
}
///
public async Task WriteBatchAndWaitAsync(
IReadOnlyDictionary values,
string flagAddress,
object flagValue,
string responseAddress,
object responseValue,
CancellationToken ct = default)
{
// Write the batch values
await WriteBatchAsync(values, ct);
// Write the flag
await WriteAsync(flagAddress, flagValue, ct);
// Wait for the response
return await WaitForResponseAsync(responseAddress, responseValue, ct);
}
#region Private Helper Methods
///
/// Validates that the client is connected.
///
private void ValidateConnection()
{
if (!IsConnected)
{
throw new InvalidOperationException("Not connected to MxAccess");
}
}
///
/// Reads a single value from the specified address.
///
private async Task ReadSingleValueAsync(string address, CancellationToken ct)
{
// MxAccess doesn't support direct read - we need to subscribe, get the value, then unsubscribe
var tcs = new TaskCompletionSource();
IAsyncDisposable? subscription = null;
try
{
subscription = await SubscribeAsync(new[] { address }, (addr, vtq) => { tcs.TrySetResult(vtq); }, ct);
return await WaitForReadResultAsync(tcs, ct);
}
finally
{
if (subscription != null)
{
await subscription.DisposeAsync();
}
}
}
///
/// Waits for a read result with timeout.
///
private async Task WaitForReadResultAsync(TaskCompletionSource tcs, CancellationToken ct)
{
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(_configuration.ReadTimeoutSeconds)))
{
using (ct.Register(() => cts.Cancel()))
{
cts.Token.Register(() => tcs.TrySetException(new TimeoutException("Read timeout")));
return await tcs.Task;
}
}
}
///
/// Reads an address with semaphore protection for batch operations.
///
private async Task ReadAddressWithSemaphoreAsync(string address, Dictionary results,
CancellationToken ct)
{
await _readSemaphore.WaitAsync(ct);
try
{
Vtq vtq = await ReadAsync(address, ct);
lock (results)
{
results[address] = vtq;
}
}
catch (Exception ex)
{
Logger.Warning(ex, "Failed to read {Address}", address);
lock (results)
{
results[address] = Vtq.Bad();
}
}
finally
{
_readSemaphore.Release();
}
}
///
/// Internal write implementation.
///
private async Task WriteInternalAsync(string address, object value, CancellationToken ct)
{
var tcs = new TaskCompletionSource();
int itemHandle = await SetupWriteOperationAsync(address, value, tcs, ct);
try
{
await WaitForWriteCompletionAsync(tcs, itemHandle, address, ct);
}
catch
{
await CleanupWriteOperationAsync(itemHandle);
throw;
}
}
///
/// Sets up a write operation and returns the item handle.
///
private async Task SetupWriteOperationAsync(string address, object value, TaskCompletionSource tcs,
CancellationToken ct)
{
return await Task.Run(() =>
{
lock (_lock)
{
ValidateConnectionLocked();
return InitiateWriteOperation(address, value, tcs);
}
}, ct);
}
///
/// Validates connection while holding the lock.
///
private void ValidateConnectionLocked()
{
if (!IsConnected || _lmxProxy == null)
{
throw new InvalidOperationException("Not connected to MxAccess");
}
}
///
/// Initiates a write operation and returns the item handle.
///
private int InitiateWriteOperation(string address, object value, TaskCompletionSource tcs)
{
int itemHandle = 0;
try
{
if (_lmxProxy == null)
{
throw new InvalidOperationException("MxAccess proxy is not initialized");
}
// Add the item if not already added
itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Advise the item to enable writing
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
// Track the pending write operation
TrackPendingWrite(address, itemHandle, tcs);
// Write the value
_lmxProxy.Write(_connectionHandle, itemHandle, value, -1); // -1 for no security
return itemHandle;
}
catch (Exception ex)
{
CleanupFailedWrite(itemHandle);
Logger.Error(ex, "Failed to write value to {Address}", address);
throw;
}
}
///
/// Tracks a pending write operation.
///
private void TrackPendingWrite(string address, int itemHandle, TaskCompletionSource tcs)
{
var writeOp = new WriteOperation
{
Address = address,
ItemHandle = itemHandle,
CompletionSource = tcs,
StartTime = DateTime.UtcNow
};
_pendingWrites[itemHandle] = writeOp;
}
///
/// Cleans up a failed write operation.
///
private void CleanupFailedWrite(int itemHandle)
{
if (itemHandle > 0 && _lmxProxy != null)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
_pendingWrites.Remove(itemHandle);
}
catch
{
}
}
}
///
/// Waits for write completion with timeout.
///
private async Task WaitForWriteCompletionAsync(TaskCompletionSource tcs, int itemHandle, string address,
CancellationToken ct)
{
using (ct.Register(() => tcs.TrySetCanceled()))
{
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(_configuration.WriteTimeoutSeconds), ct);
Task? completedTask = await Task.WhenAny(tcs.Task, timeoutTask);
if (completedTask == timeoutTask)
{
await HandleWriteTimeoutAsync(itemHandle, address);
}
await tcs.Task; // This will throw if the write failed
}
}
///
/// Handles write timeout by cleaning up resources.
///
private async Task HandleWriteTimeoutAsync(int itemHandle, string address)
{
await CleanupWriteOperationAsync(itemHandle);
throw new TimeoutException($"Write operation to {address} timed out");
}
///
/// Cleans up a write operation.
///
private async Task CleanupWriteOperationAsync(int itemHandle)
{
await Task.Run(() =>
{
lock (_lock)
{
if (_pendingWrites.ContainsKey(itemHandle))
{
_pendingWrites.Remove(itemHandle);
if (_lmxProxy != null)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
catch
{
}
}
}
}
});
}
///
/// Writes an address with semaphore protection for batch operations.
///
private async Task WriteAddressWithSemaphoreAsync(string address, object value, CancellationToken ct)
{
await _writeSemaphore.WaitAsync(ct);
try
{
await WriteAsync(address, value, ct);
}
finally
{
_writeSemaphore.Release();
}
}
///
/// Waits for a specific response value.
///
private async Task WaitForResponseAsync(string responseAddress, object responseValue,
CancellationToken ct)
{
var tcs = new TaskCompletionSource();
IAsyncDisposable? subscription = null;
try
{
subscription = await SubscribeAsync(new[] { responseAddress }, (addr, vtq) =>
{
if (Equals(vtq.Value, responseValue))
{
tcs.TrySetResult(true);
}
}, ct);
// Wait for the response value
using (ct.Register(() => tcs.TrySetResult(false)))
{
return await tcs.Task;
}
}
finally
{
if (subscription != null)
{
await subscription.DisposeAsync();
}
}
}
///
/// Gets a human-readable error message for a write error code.
///
/// The error code.
/// The error message.
private static string GetWriteErrorMessage(int errorCode)
{
return errorCode switch
{
1008 => "User lacks proper security for write operation",
1012 => "Secured write required",
1013 => "Verified write required",
_ => $"Unknown error code: {errorCode}"
};
}
#endregion
}
}