using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client
{
///
/// Extension methods for streaming operations with the LmxProxy client
///
public static class StreamingExtensions
{
///
/// Reads multiple tag values as an async stream for efficient memory usage with large datasets
///
/// The LmxProxy client
/// The addresses to read
/// Size of each batch to process
/// Cancellation token
/// An async enumerable of tag values
public static async IAsyncEnumerable> ReadStreamAsync(
this ILmxProxyClient client,
IEnumerable addresses,
int batchSize = 100,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
if (batchSize <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive");
var batch = new List(batchSize);
int errorCount = 0;
const int maxConsecutiveErrors = 3;
foreach (string address in addresses)
{
batch.Add(address);
if (batch.Count >= batchSize)
{
bool success = false;
int retries = 0;
const int maxRetries = 2;
while (!success && retries < maxRetries)
{
IDictionary? results = null;
Exception? lastException = null;
try
{
results = await client.ReadBatchAsync(batch, cancellationToken);
errorCount = 0; // Reset error count on success
success = true;
}
catch (OperationCanceledException)
{
throw; // Don't retry on cancellation
}
catch (Exception ex)
{
lastException = ex;
retries++;
errorCount++;
if (errorCount >= maxConsecutiveErrors)
{
throw new InvalidOperationException(
$"Stream reading failed after {maxConsecutiveErrors} consecutive errors", ex);
}
if (retries >= maxRetries)
{
// Log error and continue with next batch
System.Diagnostics.Debug.WriteLine($"Failed to read batch after {maxRetries} retries: {ex.Message}");
batch.Clear();
break;
}
// Wait before retry with exponential backoff
await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, retries - 1)), cancellationToken);
}
if (results != null)
{
foreach (KeyValuePair result in results)
{
yield return result;
}
batch.Clear();
}
}
}
cancellationToken.ThrowIfCancellationRequested();
}
// Process remaining items
if (batch.Count > 0)
{
IDictionary? results = null;
try
{
results = await client.ReadBatchAsync(batch, cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Log error for final batch but don't throw to allow partial results
System.Diagnostics.Debug.WriteLine($"Failed to read final batch: {ex.Message}");
}
if (results != null)
{
foreach (KeyValuePair result in results)
{
yield return result;
}
}
}
}
///
/// Writes multiple tag values as an async stream for efficient memory usage with large datasets
///
/// The LmxProxy client
/// The values to write as an async enumerable
/// Size of each batch to process
/// Cancellation token
/// The number of values written
public static async Task WriteStreamAsync(
this ILmxProxyClient client,
IAsyncEnumerable> values,
int batchSize = 100,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(values);
if (batchSize <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive");
var batch = new Dictionary(batchSize);
int totalWritten = 0;
await foreach (KeyValuePair kvp in values.WithCancellation(cancellationToken))
{
batch[kvp.Key] = kvp.Value;
if (batch.Count >= batchSize)
{
await client.WriteBatchAsync(batch, cancellationToken);
totalWritten += batch.Count;
batch.Clear();
}
}
// Process remaining items
if (batch.Count > 0)
{
await client.WriteBatchAsync(batch, cancellationToken);
totalWritten += batch.Count;
}
return totalWritten;
}
///
/// Processes tag values in parallel batches for maximum throughput
///
/// The LmxProxy client
/// The addresses to read
/// The async function to process each value
/// Maximum number of concurrent operations
/// Cancellation token
public static async Task ProcessInParallelAsync(
this ILmxProxyClient client,
IEnumerable addresses,
Func processor,
int maxDegreeOfParallelism = 4,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
ArgumentNullException.ThrowIfNull(processor);
if (maxDegreeOfParallelism <= 0)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
var tasks = new List();
await foreach (KeyValuePair kvp in client.ReadStreamAsync(addresses, cancellationToken: cancellationToken))
{
await semaphore.WaitAsync(cancellationToken);
var task = Task.Run(async () =>
{
try
{
await processor(kvp.Key, kvp.Value);
}
finally
{
semaphore.Release();
}
}, cancellationToken);
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
///
/// Subscribes to multiple tags and returns updates as an async stream
///
/// The LmxProxy client
/// The addresses to subscribe to
/// Poll interval in milliseconds
/// Cancellation token
/// An async enumerable of tag updates
public static async IAsyncEnumerable SubscribeStreamAsync(
this ILmxProxyClient client,
IEnumerable addresses,
int pollIntervalMs = 1000,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
var updateChannel = System.Threading.Channels.Channel.CreateUnbounded();
// Setup update handler
void OnUpdate(string address, Vtq vtq)
{
updateChannel.Writer.TryWrite(vtq);
}
ISubscription subscription = await client.SubscribeAsync(addresses, OnUpdate, cancellationToken);
try
{
await foreach (Vtq update in updateChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return update;
}
}
finally
{
await subscription.DisposeAsync();
}
}
}
}