Files
scadalink-design/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/StreamingExtensions.cs
Joseph Doherty 2810306415 feat: add standalone LmxProxy solution, windev VM documentation
Split LmxProxy Host and Client into a self-contained solution under lmxproxy/,
ported from the ScadaBridge monorepo with updated namespaces (ZB.MOM.WW.LmxProxy.*).
Client project (.NET 10) inlines Core/DataEngine dependencies and builds clean.
Host project (.NET Fx 4.8) retains ArchestrA.MXAccess for Windows deployment.
Added windev.md documenting the WW_DEV_VM development environment setup.
2026-03-21 20:50:05 -04:00

261 lines
10 KiB
C#

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client
{
/// <summary>
/// Extension methods for streaming operations with the LmxProxy client
/// </summary>
public static class StreamingExtensions
{
/// <summary>
/// Reads multiple tag values as an async stream for efficient memory usage with large datasets
/// </summary>
/// <param name="client">The LmxProxy client</param>
/// <param name="addresses">The addresses to read</param>
/// <param name="batchSize">Size of each batch to process</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An async enumerable of tag values</returns>
public static async IAsyncEnumerable<KeyValuePair<string, Vtq>> ReadStreamAsync(
this ILmxProxyClient client,
IEnumerable<string> addresses,
int batchSize = 100,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
if (batchSize <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive");
var batch = new List<string>(batchSize);
int errorCount = 0;
const int maxConsecutiveErrors = 3;
foreach (string address in addresses)
{
batch.Add(address);
if (batch.Count >= batchSize)
{
bool success = false;
int retries = 0;
const int maxRetries = 2;
while (!success && retries < maxRetries)
{
IDictionary<string, Vtq>? results = null;
Exception? lastException = null;
try
{
results = await client.ReadBatchAsync(batch, cancellationToken);
errorCount = 0; // Reset error count on success
success = true;
}
catch (OperationCanceledException)
{
throw; // Don't retry on cancellation
}
catch (Exception ex)
{
lastException = ex;
retries++;
errorCount++;
if (errorCount >= maxConsecutiveErrors)
{
throw new InvalidOperationException(
$"Stream reading failed after {maxConsecutiveErrors} consecutive errors", ex);
}
if (retries >= maxRetries)
{
// Log error and continue with next batch
System.Diagnostics.Debug.WriteLine($"Failed to read batch after {maxRetries} retries: {ex.Message}");
batch.Clear();
break;
}
// Wait before retry with exponential backoff
await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, retries - 1)), cancellationToken);
}
if (results != null)
{
foreach (KeyValuePair<string, Vtq> result in results)
{
yield return result;
}
batch.Clear();
}
}
}
cancellationToken.ThrowIfCancellationRequested();
}
// Process remaining items
if (batch.Count > 0)
{
IDictionary<string, Vtq>? results = null;
try
{
results = await client.ReadBatchAsync(batch, cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Log error for final batch but don't throw to allow partial results
System.Diagnostics.Debug.WriteLine($"Failed to read final batch: {ex.Message}");
}
if (results != null)
{
foreach (KeyValuePair<string, Vtq> result in results)
{
yield return result;
}
}
}
}
/// <summary>
/// Writes multiple tag values as an async stream for efficient memory usage with large datasets
/// </summary>
/// <param name="client">The LmxProxy client</param>
/// <param name="values">The values to write as an async enumerable</param>
/// <param name="batchSize">Size of each batch to process</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The number of values written</returns>
public static async Task<int> WriteStreamAsync(
this ILmxProxyClient client,
IAsyncEnumerable<KeyValuePair<string, object>> values,
int batchSize = 100,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(values);
if (batchSize <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive");
var batch = new Dictionary<string, object>(batchSize);
int totalWritten = 0;
await foreach (KeyValuePair<string, object> kvp in values.WithCancellation(cancellationToken))
{
batch[kvp.Key] = kvp.Value;
if (batch.Count >= batchSize)
{
await client.WriteBatchAsync(batch, cancellationToken);
totalWritten += batch.Count;
batch.Clear();
}
}
// Process remaining items
if (batch.Count > 0)
{
await client.WriteBatchAsync(batch, cancellationToken);
totalWritten += batch.Count;
}
return totalWritten;
}
/// <summary>
/// Processes tag values in parallel batches for maximum throughput
/// </summary>
/// <param name="client">The LmxProxy client</param>
/// <param name="addresses">The addresses to read</param>
/// <param name="processor">The async function to process each value</param>
/// <param name="maxDegreeOfParallelism">Maximum number of concurrent operations</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessInParallelAsync(
this ILmxProxyClient client,
IEnumerable<string> addresses,
Func<string, Vtq, Task> processor,
int maxDegreeOfParallelism = 4,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
ArgumentNullException.ThrowIfNull(processor);
if (maxDegreeOfParallelism <= 0)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
var tasks = new List<Task>();
await foreach (KeyValuePair<string, Vtq> kvp in client.ReadStreamAsync(addresses, cancellationToken: cancellationToken))
{
await semaphore.WaitAsync(cancellationToken);
var task = Task.Run(async () =>
{
try
{
await processor(kvp.Key, kvp.Value);
}
finally
{
semaphore.Release();
}
}, cancellationToken);
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
/// <summary>
/// Subscribes to multiple tags and returns updates as an async stream
/// </summary>
/// <param name="client">The LmxProxy client</param>
/// <param name="addresses">The addresses to subscribe to</param>
/// <param name="pollIntervalMs">Poll interval in milliseconds</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An async enumerable of tag updates</returns>
public static async IAsyncEnumerable<Vtq> SubscribeStreamAsync(
this ILmxProxyClient client,
IEnumerable<string> addresses,
int pollIntervalMs = 1000,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(addresses);
var updateChannel = System.Threading.Channels.Channel.CreateUnbounded<Vtq>();
// Setup update handler
void OnUpdate(string address, Vtq vtq)
{
updateChannel.Writer.TryWrite(vtq);
}
ISubscription subscription = await client.SubscribeAsync(addresses, OnUpdate, cancellationToken);
try
{
await foreach (Vtq update in updateChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return update;
}
}
finally
{
await subscription.DisposeAsync();
}
}
}
}