using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using ZB.MOM.WW.LmxProxy.Client.Domain; namespace ZB.MOM.WW.LmxProxy.Client { /// /// Extension methods for streaming operations with the LmxProxy client /// public static class StreamingExtensions { /// /// Reads multiple tag values as an async stream for efficient memory usage with large datasets /// /// The LmxProxy client /// The addresses to read /// Size of each batch to process /// Cancellation token /// An async enumerable of tag values public static async IAsyncEnumerable> ReadStreamAsync( this ILmxProxyClient client, IEnumerable addresses, int batchSize = 100, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(addresses); if (batchSize <= 0) throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive"); var batch = new List(batchSize); int errorCount = 0; const int maxConsecutiveErrors = 3; foreach (string address in addresses) { batch.Add(address); if (batch.Count >= batchSize) { bool success = false; int retries = 0; const int maxRetries = 2; while (!success && retries < maxRetries) { IDictionary? results = null; Exception? lastException = null; try { results = await client.ReadBatchAsync(batch, cancellationToken); errorCount = 0; // Reset error count on success success = true; } catch (OperationCanceledException) { throw; // Don't retry on cancellation } catch (Exception ex) { lastException = ex; retries++; errorCount++; if (errorCount >= maxConsecutiveErrors) { throw new InvalidOperationException( $"Stream reading failed after {maxConsecutiveErrors} consecutive errors", ex); } if (retries >= maxRetries) { // Log error and continue with next batch System.Diagnostics.Debug.WriteLine($"Failed to read batch after {maxRetries} retries: {ex.Message}"); batch.Clear(); break; } // Wait before retry with exponential backoff await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, retries - 1)), cancellationToken); } if (results != null) { foreach (KeyValuePair result in results) { yield return result; } batch.Clear(); } } } cancellationToken.ThrowIfCancellationRequested(); } // Process remaining items if (batch.Count > 0) { IDictionary? results = null; try { results = await client.ReadBatchAsync(batch, cancellationToken); } catch (OperationCanceledException) { throw; } catch (Exception ex) { // Log error for final batch but don't throw to allow partial results System.Diagnostics.Debug.WriteLine($"Failed to read final batch: {ex.Message}"); } if (results != null) { foreach (KeyValuePair result in results) { yield return result; } } } } /// /// Writes multiple tag values as an async stream for efficient memory usage with large datasets /// /// The LmxProxy client /// The values to write as an async enumerable /// Size of each batch to process /// Cancellation token /// The number of values written public static async Task WriteStreamAsync( this ILmxProxyClient client, IAsyncEnumerable> values, int batchSize = 100, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(values); if (batchSize <= 0) throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive"); var batch = new Dictionary(batchSize); int totalWritten = 0; await foreach (KeyValuePair kvp in values.WithCancellation(cancellationToken)) { batch[kvp.Key] = kvp.Value; if (batch.Count >= batchSize) { await client.WriteBatchAsync(batch, cancellationToken); totalWritten += batch.Count; batch.Clear(); } } // Process remaining items if (batch.Count > 0) { await client.WriteBatchAsync(batch, cancellationToken); totalWritten += batch.Count; } return totalWritten; } /// /// Processes tag values in parallel batches for maximum throughput /// /// The LmxProxy client /// The addresses to read /// The async function to process each value /// Maximum number of concurrent operations /// Cancellation token public static async Task ProcessInParallelAsync( this ILmxProxyClient client, IEnumerable addresses, Func processor, int maxDegreeOfParallelism = 4, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(addresses); ArgumentNullException.ThrowIfNull(processor); if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism); var tasks = new List(); await foreach (KeyValuePair kvp in client.ReadStreamAsync(addresses, cancellationToken: cancellationToken)) { await semaphore.WaitAsync(cancellationToken); var task = Task.Run(async () => { try { await processor(kvp.Key, kvp.Value); } finally { semaphore.Release(); } }, cancellationToken); tasks.Add(task); } await Task.WhenAll(tasks); } /// /// Subscribes to multiple tags and returns updates as an async stream /// /// The LmxProxy client /// The addresses to subscribe to /// Poll interval in milliseconds /// Cancellation token /// An async enumerable of tag updates public static async IAsyncEnumerable SubscribeStreamAsync( this ILmxProxyClient client, IEnumerable addresses, int pollIntervalMs = 1000, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(addresses); var updateChannel = System.Threading.Channels.Channel.CreateUnbounded(); // Setup update handler void OnUpdate(string address, Vtq vtq) { updateChannel.Writer.TryWrite(vtq); } ISubscription subscription = await client.SubscribeAsync(addresses, OnUpdate, cancellationToken); try { await foreach (Vtq update in updateChannel.Reader.ReadAllAsync(cancellationToken)) { yield return update; } } finally { await subscription.DisposeAsync(); } } } }