diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs
index d8f67b5..c1c42de 100644
--- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs
@@ -3,11 +3,17 @@ namespace ZB.MOM.WW.LmxProxy.Client;
///
/// Configuration options for the LmxProxy client, typically set via the builder.
///
-public class ClientConfiguration
+internal class ClientConfiguration
{
/// Maximum number of retry attempts for transient failures.
- public int MaxRetryAttempts { get; set; } = 0;
+ public int MaxRetryAttempts { get; set; }
/// Base delay between retries (exponential backoff applied).
- public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
+ public TimeSpan RetryDelay { get; set; }
+
+ /// Whether client-side metrics collection is enabled.
+ public bool EnableMetrics { get; set; }
+
+ /// Optional header name for correlation ID propagation.
+ public string? CorrelationIdHeader { get; set; }
}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClientFactory.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClientFactory.cs
new file mode 100644
index 0000000..2b23252
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClientFactory.cs
@@ -0,0 +1,81 @@
+using Microsoft.Extensions.Configuration;
+
+namespace ZB.MOM.WW.LmxProxy.Client;
+
+///
+/// Factory for creating instances.
+///
+public interface ILmxProxyClientFactory
+{
+ /// Creates a client from the default "LmxProxy" configuration section.
+ LmxProxyClient CreateClient();
+
+ /// Creates a client from a named configuration section.
+ LmxProxyClient CreateClient(string configName);
+
+ /// Creates a client using a builder configuration action.
+ LmxProxyClient CreateClient(Action builderAction);
+}
+
+///
+/// Default implementation of that reads from IConfiguration.
+///
+public class LmxProxyClientFactory : ILmxProxyClientFactory
+{
+ private readonly IConfiguration _configuration;
+
+ /// Creates a new factory with the specified configuration.
+ public LmxProxyClientFactory(IConfiguration configuration)
+ {
+ _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+ }
+
+ ///
+ public LmxProxyClient CreateClient() => CreateClient("LmxProxy");
+
+ ///
+ public LmxProxyClient CreateClient(string configName)
+ {
+ IConfigurationSection section = _configuration.GetSection(configName);
+ var options = new LmxProxyClientOptions();
+ section.Bind(options);
+ return BuildFromOptions(options);
+ }
+
+ ///
+ public LmxProxyClient CreateClient(Action builderAction)
+ {
+ var builder = new LmxProxyClientBuilder();
+ builderAction(builder);
+ return builder.Build();
+ }
+
+ private static LmxProxyClient BuildFromOptions(LmxProxyClientOptions options)
+ {
+ var builder = new LmxProxyClientBuilder()
+ .WithHost(options.Host)
+ .WithPort(options.Port)
+ .WithTimeout(options.Timeout)
+ .WithRetryPolicy(options.Retry.MaxAttempts, options.Retry.Delay);
+
+ if (!string.IsNullOrEmpty(options.ApiKey))
+ builder.WithApiKey(options.ApiKey);
+
+ if (options.EnableMetrics)
+ builder.WithMetrics();
+
+ if (!string.IsNullOrEmpty(options.CorrelationIdHeader))
+ builder.WithCorrelationIdHeader(options.CorrelationIdHeader);
+
+ if (options.UseSsl)
+ {
+ builder.WithTlsConfiguration(new ClientTlsConfiguration
+ {
+ UseTls = true,
+ ServerCaCertificatePath = options.CertificatePath
+ });
+ }
+
+ return builder.Build();
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientBuilder.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientBuilder.cs
new file mode 100644
index 0000000..67ca7e4
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientBuilder.cs
@@ -0,0 +1,157 @@
+using Microsoft.Extensions.Logging;
+
+namespace ZB.MOM.WW.LmxProxy.Client;
+
+///
+/// Fluent builder for creating configured instances.
+///
+public class LmxProxyClientBuilder
+{
+ private string? _host;
+ private int _port = 50051;
+ private string? _apiKey;
+ private ILogger? _logger;
+ private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30);
+ private int _maxRetryAttempts = 3;
+ private TimeSpan _retryDelay = TimeSpan.FromSeconds(1);
+ private bool _enableMetrics;
+ private string? _correlationIdHeader;
+ private ClientTlsConfiguration? _tlsConfiguration;
+
+ /// Sets the host address of the LmxProxy server. Required.
+ public LmxProxyClientBuilder WithHost(string host)
+ {
+ if (string.IsNullOrWhiteSpace(host))
+ throw new ArgumentException("Host must not be null or empty.", nameof(host));
+ _host = host;
+ return this;
+ }
+
+ /// Sets the port of the LmxProxy server. Default is 50051.
+ public LmxProxyClientBuilder WithPort(int port)
+ {
+ if (port < 1 || port > 65535)
+ throw new ArgumentOutOfRangeException(nameof(port), "Port must be between 1 and 65535.");
+ _port = port;
+ return this;
+ }
+
+ /// Sets the API key for authentication.
+ public LmxProxyClientBuilder WithApiKey(string? apiKey)
+ {
+ _apiKey = apiKey;
+ return this;
+ }
+
+ /// Sets the logger instance for the client.
+ public LmxProxyClientBuilder WithLogger(ILogger logger)
+ {
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ return this;
+ }
+
+ /// Sets the default timeout for operations. Must be between 1 second and 10 minutes.
+ public LmxProxyClientBuilder WithTimeout(TimeSpan timeout)
+ {
+ if (timeout <= TimeSpan.Zero || timeout > TimeSpan.FromMinutes(10))
+ throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be greater than zero and at most 10 minutes.");
+ _defaultTimeout = timeout;
+ return this;
+ }
+
+ /// Enables TLS with an optional server CA certificate path.
+ public LmxProxyClientBuilder WithSslCredentials(string? certificatePath)
+ {
+ _tlsConfiguration ??= new ClientTlsConfiguration();
+ _tlsConfiguration.UseTls = true;
+ _tlsConfiguration.ServerCaCertificatePath = certificatePath;
+ return this;
+ }
+
+ /// Sets a full TLS configuration.
+ public LmxProxyClientBuilder WithTlsConfiguration(ClientTlsConfiguration config)
+ {
+ _tlsConfiguration = config ?? throw new ArgumentNullException(nameof(config));
+ return this;
+ }
+
+ /// Configures the retry policy. maxAttempts must be positive, retryDelay must be positive.
+ public LmxProxyClientBuilder WithRetryPolicy(int maxAttempts, TimeSpan retryDelay)
+ {
+ if (maxAttempts <= 0)
+ throw new ArgumentOutOfRangeException(nameof(maxAttempts), "Max retry attempts must be greater than zero.");
+ if (retryDelay <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException(nameof(retryDelay), "Retry delay must be greater than zero.");
+ _maxRetryAttempts = maxAttempts;
+ _retryDelay = retryDelay;
+ return this;
+ }
+
+ /// Enables client-side metrics collection.
+ public LmxProxyClientBuilder WithMetrics()
+ {
+ _enableMetrics = true;
+ return this;
+ }
+
+ /// Sets the correlation ID header name for request tracing.
+ public LmxProxyClientBuilder WithCorrelationIdHeader(string headerName)
+ {
+ if (string.IsNullOrEmpty(headerName))
+ throw new ArgumentException("Header name must not be null or empty.", nameof(headerName));
+ _correlationIdHeader = headerName;
+ return this;
+ }
+
+ ///
+ /// Builds and returns a configured instance.
+ ///
+ /// Thrown when host is not set.
+ /// Thrown when TLS certificate paths don't exist.
+ public LmxProxyClient Build()
+ {
+ if (string.IsNullOrWhiteSpace(_host))
+ throw new InvalidOperationException("Host must be specified. Call WithHost() before Build().");
+
+ ValidateTlsConfiguration();
+
+ var client = new LmxProxyClient(_host, _port, _apiKey, _tlsConfiguration, _logger)
+ {
+ DefaultTimeout = _defaultTimeout
+ };
+
+ client.SetBuilderConfiguration(new ClientConfiguration
+ {
+ MaxRetryAttempts = _maxRetryAttempts,
+ RetryDelay = _retryDelay,
+ EnableMetrics = _enableMetrics,
+ CorrelationIdHeader = _correlationIdHeader
+ });
+
+ return client;
+ }
+
+ private void ValidateTlsConfiguration()
+ {
+ if (_tlsConfiguration?.UseTls != true)
+ return;
+
+ if (!string.IsNullOrEmpty(_tlsConfiguration.ServerCaCertificatePath) &&
+ !File.Exists(_tlsConfiguration.ServerCaCertificatePath))
+ throw new FileNotFoundException(
+ $"Server CA certificate not found: {_tlsConfiguration.ServerCaCertificatePath}",
+ _tlsConfiguration.ServerCaCertificatePath);
+
+ if (!string.IsNullOrEmpty(_tlsConfiguration.ClientCertificatePath) &&
+ !File.Exists(_tlsConfiguration.ClientCertificatePath))
+ throw new FileNotFoundException(
+ $"Client certificate not found: {_tlsConfiguration.ClientCertificatePath}",
+ _tlsConfiguration.ClientCertificatePath);
+
+ if (!string.IsNullOrEmpty(_tlsConfiguration.ClientKeyPath) &&
+ !File.Exists(_tlsConfiguration.ClientKeyPath))
+ throw new FileNotFoundException(
+ $"Client key not found: {_tlsConfiguration.ClientKeyPath}",
+ _tlsConfiguration.ClientKeyPath);
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientOptions.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientOptions.cs
new file mode 100644
index 0000000..9933dd8
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClientOptions.cs
@@ -0,0 +1,46 @@
+namespace ZB.MOM.WW.LmxProxy.Client;
+
+///
+/// Configuration options for creating an LmxProxy client from IConfiguration sections.
+///
+public class LmxProxyClientOptions
+{
+ /// Host address of the LmxProxy server.
+ public string Host { get; set; } = "localhost";
+
+ /// Port of the LmxProxy server.
+ public int Port { get; set; } = 50051;
+
+ /// API key for authentication.
+ public string? ApiKey { get; set; }
+
+ /// Default timeout for operations.
+ public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ /// Whether to use TLS for the connection.
+ public bool UseSsl { get; set; }
+
+ /// Path to the server CA certificate for TLS.
+ public string? CertificatePath { get; set; }
+
+ /// Whether to enable client-side metrics collection.
+ public bool EnableMetrics { get; set; }
+
+ /// Optional header name for correlation ID propagation.
+ public string? CorrelationIdHeader { get; set; }
+
+ /// Retry policy options.
+ public RetryOptions Retry { get; set; } = new();
+}
+
+///
+/// Retry policy configuration options.
+///
+public class RetryOptions
+{
+ /// Maximum number of retry attempts.
+ public int MaxAttempts { get; set; } = 3;
+
+ /// Base delay between retries.
+ public TimeSpan Delay { get; set; } = TimeSpan.FromSeconds(1);
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ServiceCollectionExtensions.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..ac75089
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ServiceCollectionExtensions.cs
@@ -0,0 +1,71 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace ZB.MOM.WW.LmxProxy.Client;
+
+///
+/// Extension methods for registering LmxProxy client services in the DI container.
+///
+public static class ServiceCollectionExtensions
+{
+ /// Registers a singleton ILmxProxyClient from the "LmxProxy" config section.
+ public static IServiceCollection AddLmxProxyClient(
+ this IServiceCollection services, IConfiguration configuration)
+ {
+ return services.AddLmxProxyClient(configuration, "LmxProxy");
+ }
+
+ /// Registers a singleton ILmxProxyClient from a named config section.
+ public static IServiceCollection AddLmxProxyClient(
+ this IServiceCollection services, IConfiguration configuration, string sectionName)
+ {
+ services.AddSingleton(
+ sp => new LmxProxyClientFactory(configuration));
+ services.AddSingleton(sp =>
+ {
+ var factory = sp.GetRequiredService();
+ return factory.CreateClient(sectionName);
+ });
+ return services;
+ }
+
+ /// Registers a singleton ILmxProxyClient via builder action.
+ public static IServiceCollection AddLmxProxyClient(
+ this IServiceCollection services, Action configure)
+ {
+ services.AddSingleton(sp =>
+ {
+ var builder = new LmxProxyClientBuilder();
+ configure(builder);
+ return builder.Build();
+ });
+ return services;
+ }
+
+ /// Registers a scoped ILmxProxyClient from the "LmxProxy" config section.
+ public static IServiceCollection AddScopedLmxProxyClient(
+ this IServiceCollection services, IConfiguration configuration)
+ {
+ services.AddSingleton(
+ sp => new LmxProxyClientFactory(configuration));
+ services.AddScoped(sp =>
+ {
+ var factory = sp.GetRequiredService();
+ return factory.CreateClient();
+ });
+ return services;
+ }
+
+ /// Registers a keyed singleton ILmxProxyClient.
+ public static IServiceCollection AddNamedLmxProxyClient(
+ this IServiceCollection services, string name, Action configure)
+ {
+ services.AddKeyedSingleton(name, (sp, key) =>
+ {
+ var builder = new LmxProxyClientBuilder();
+ configure(builder);
+ return builder.Build();
+ });
+ return services;
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/StreamingExtensions.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/StreamingExtensions.cs
new file mode 100644
index 0000000..f0854b5
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/StreamingExtensions.cs
@@ -0,0 +1,218 @@
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+using ZB.MOM.WW.LmxProxy.Client.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Client;
+
+///
+/// Extension methods for streaming reads, writes, and subscriptions over ILmxProxyClient.
+///
+public static class StreamingExtensions
+{
+ ///
+ /// Reads multiple tags as an async stream in batches.
+ /// Retries up to 2 times per batch. Aborts after 3 consecutive batch errors.
+ ///
+ public static async IAsyncEnumerable> ReadStreamAsync(
+ this ILmxProxyClient client,
+ IEnumerable addresses,
+ int batchSize = 100,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(addresses);
+ if (batchSize <= 0)
+ throw new ArgumentOutOfRangeException(nameof(batchSize));
+
+ var batch = new List(batchSize);
+ int consecutiveErrors = 0;
+ const int maxConsecutiveErrors = 3;
+ const int maxRetries = 2;
+
+ foreach (string address in addresses)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ batch.Add(address);
+
+ if (batch.Count >= batchSize)
+ {
+ bool success = false;
+ await foreach (var kvp in ReadBatchWithRetry(
+ client, batch, maxRetries, cancellationToken))
+ {
+ consecutiveErrors = 0;
+ success = true;
+ yield return kvp;
+ }
+ if (!success)
+ {
+ consecutiveErrors++;
+ if (consecutiveErrors >= maxConsecutiveErrors)
+ yield break;
+ }
+ batch.Clear();
+ }
+ }
+
+ // Process remaining
+ if (batch.Count > 0)
+ {
+ await foreach (var kvp in ReadBatchWithRetry(
+ client, batch, maxRetries, cancellationToken))
+ {
+ yield return kvp;
+ }
+ }
+ }
+
+ private static async IAsyncEnumerable> ReadBatchWithRetry(
+ ILmxProxyClient client,
+ List batch,
+ int maxRetries,
+ [EnumeratorCancellation] CancellationToken ct)
+ {
+ int retries = 0;
+ while (retries <= maxRetries)
+ {
+ IDictionary? results = null;
+ try
+ {
+ results = await client.ReadBatchAsync(batch, ct);
+ }
+ catch when (retries < maxRetries)
+ {
+ retries++;
+ continue;
+ }
+
+ if (results is not null)
+ {
+ foreach (var kvp in results)
+ yield return kvp;
+ yield break;
+ }
+ retries++;
+ }
+ }
+
+ ///
+ /// Writes values from an async enumerable in batches. Returns total count written.
+ ///
+ public static async Task WriteStreamAsync(
+ this ILmxProxyClient client,
+ IAsyncEnumerable> values,
+ int batchSize = 100,
+ CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(values);
+ if (batchSize <= 0)
+ throw new ArgumentOutOfRangeException(nameof(batchSize));
+
+ var batch = new Dictionary(batchSize);
+ int totalWritten = 0;
+
+ await foreach (var kvp in values.WithCancellation(cancellationToken))
+ {
+ batch[kvp.Key] = kvp.Value;
+
+ if (batch.Count >= batchSize)
+ {
+ await client.WriteBatchAsync(batch, cancellationToken);
+ totalWritten += batch.Count;
+ batch.Clear();
+ }
+ }
+
+ if (batch.Count > 0)
+ {
+ await client.WriteBatchAsync(batch, cancellationToken);
+ totalWritten += batch.Count;
+ }
+
+ return totalWritten;
+ }
+
+ ///
+ /// Processes items in parallel with a configurable max concurrency (default 4).
+ ///
+ public static async Task ProcessInParallelAsync(
+ this IAsyncEnumerable source,
+ Func processor,
+ int maxConcurrency = 4,
+ CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(processor);
+ if (maxConcurrency <= 0)
+ throw new ArgumentOutOfRangeException(nameof(maxConcurrency));
+
+ using var semaphore = new SemaphoreSlim(maxConcurrency);
+ var tasks = new List();
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ await semaphore.WaitAsync(cancellationToken);
+
+ tasks.Add(Task.Run(async () =>
+ {
+ try
+ {
+ await processor(item, cancellationToken);
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ }, cancellationToken));
+ }
+
+ await Task.WhenAll(tasks);
+ }
+
+ ///
+ /// Wraps a callback-based subscription into an IAsyncEnumerable via System.Threading.Channels.
+ ///
+ public static async IAsyncEnumerable<(string Tag, Vtq Vtq)> SubscribeStreamAsync(
+ this ILmxProxyClient client,
+ IEnumerable addresses,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(addresses);
+
+ var channel = Channel.CreateBounded<(string, Vtq)>(
+ new BoundedChannelOptions(1000)
+ {
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = false
+ });
+
+ LmxProxyClient.ISubscription? subscription = null;
+ try
+ {
+ subscription = await client.SubscribeAsync(
+ addresses,
+ (tag, vtq) =>
+ {
+ channel.Writer.TryWrite((tag, vtq));
+ },
+ ex =>
+ {
+ channel.Writer.TryComplete(ex);
+ },
+ cancellationToken);
+
+ await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
+ {
+ yield return item;
+ }
+ }
+ finally
+ {
+ subscription?.Dispose();
+ channel.Writer.TryComplete();
+ }
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeLmxProxyClient.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeLmxProxyClient.cs
new file mode 100644
index 0000000..e44e361
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeLmxProxyClient.cs
@@ -0,0 +1,91 @@
+using ZB.MOM.WW.LmxProxy.Client.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
+
+///
+/// Hand-written fake implementation of ILmxProxyClient for unit testing streaming extensions.
+///
+internal class FakeLmxProxyClient : ILmxProxyClient
+{
+ public TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ // Track calls
+ public List> ReadBatchCalls { get; } = [];
+ public List> WriteBatchCalls { get; } = [];
+ public List> SubscribeCalls { get; } = [];
+
+ // Configurable responses
+ public Func, CancellationToken, Task>>? ReadBatchHandler { get; set; }
+ public Exception? ReadBatchExceptionToThrow { get; set; }
+ public int ReadBatchExceptionCount { get; set; }
+ private int _readBatchCallCount;
+
+ // Subscription support
+ public Action? CapturedOnUpdate { get; private set; }
+ public Action? CapturedOnError { get; private set; }
+
+ public Task ConnectAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
+ public Task DisconnectAsync() => Task.CompletedTask;
+ public Task IsConnectedAsync() => Task.FromResult(true);
+
+ public Task ReadAsync(string address, CancellationToken cancellationToken = default)
+ => Task.FromResult(new Vtq(null, DateTime.UtcNow, Quality.Good));
+
+ public Task> ReadBatchAsync(IEnumerable addresses, CancellationToken cancellationToken = default)
+ {
+ var addressList = addresses.ToList();
+ ReadBatchCalls.Add(addressList);
+ _readBatchCallCount++;
+
+ if (ReadBatchExceptionToThrow is not null && _readBatchCallCount <= ReadBatchExceptionCount)
+ throw ReadBatchExceptionToThrow;
+
+ if (ReadBatchHandler is not null)
+ return ReadBatchHandler(addressList, cancellationToken);
+
+ var result = new Dictionary();
+ foreach (var addr in addressList)
+ result[addr] = new Vtq(42.0, DateTime.UtcNow, Quality.Good);
+ return Task.FromResult>(result);
+ }
+
+ public Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default)
+ => Task.CompletedTask;
+
+ public Task WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default)
+ {
+ WriteBatchCalls.Add(new Dictionary(values));
+ return Task.CompletedTask;
+ }
+
+ public Task WriteBatchAndWaitAsync(
+ IDictionary values, string flagTag, TypedValue flagValue,
+ int timeoutMs = 5000, int pollIntervalMs = 100, CancellationToken cancellationToken = default)
+ => Task.FromResult(new WriteBatchAndWaitResponse { Success = true });
+
+ public Task SubscribeAsync(
+ IEnumerable addresses,
+ Action onUpdate,
+ Action? onStreamError = null,
+ CancellationToken cancellationToken = default)
+ {
+ SubscribeCalls.Add(addresses);
+ CapturedOnUpdate = onUpdate;
+ CapturedOnError = onStreamError;
+ return Task.FromResult(new FakeSubscription());
+ }
+
+ public Task CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default)
+ => Task.FromResult(new LmxProxyClient.ApiKeyInfo { IsValid = true });
+
+ public Dictionary GetMetrics() => [];
+
+ public void Dispose() { }
+ public ValueTask DisposeAsync() => ValueTask.CompletedTask;
+
+ private class FakeSubscription : LmxProxyClient.ISubscription
+ {
+ public void Dispose() { }
+ public Task DisposeAsync() => Task.CompletedTask;
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientBuilderTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientBuilderTests.cs
new file mode 100644
index 0000000..fc15300
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientBuilderTests.cs
@@ -0,0 +1,106 @@
+using Xunit;
+
+namespace ZB.MOM.WW.LmxProxy.Client.Tests;
+
+public class LmxProxyClientBuilderTests
+{
+ [Fact]
+ public void Build_ThrowsWhenHostNotSet()
+ {
+ var builder = new LmxProxyClientBuilder();
+ Assert.Throws(() => builder.Build());
+ }
+
+ [Fact]
+ public void Build_DefaultPort_Is50051()
+ {
+ var client = new LmxProxyClientBuilder()
+ .WithHost("localhost")
+ .Build();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void WithPort_ThrowsOnZero()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithPort(0));
+ }
+
+ [Fact]
+ public void WithPort_ThrowsOn65536()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithPort(65536));
+ }
+
+ [Fact]
+ public void WithTimeout_ThrowsOnNegative()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithTimeout(TimeSpan.FromSeconds(-1)));
+ }
+
+ [Fact]
+ public void WithTimeout_ThrowsOver10Minutes()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithTimeout(TimeSpan.FromMinutes(11)));
+ }
+
+ [Fact]
+ public void WithRetryPolicy_ThrowsOnZeroAttempts()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithRetryPolicy(0, TimeSpan.FromSeconds(1)));
+ }
+
+ [Fact]
+ public void WithRetryPolicy_ThrowsOnZeroDelay()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithRetryPolicy(3, TimeSpan.Zero));
+ }
+
+ [Fact]
+ public void Build_WithAllOptions_Succeeds()
+ {
+ var client = new LmxProxyClientBuilder()
+ .WithHost("10.100.0.48")
+ .WithPort(50051)
+ .WithApiKey("test-key")
+ .WithTimeout(TimeSpan.FromSeconds(15))
+ .WithRetryPolicy(5, TimeSpan.FromSeconds(2))
+ .WithMetrics()
+ .WithCorrelationIdHeader("X-Correlation-ID")
+ .Build();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void Build_WithTls_ValidatesCertificatePaths()
+ {
+ var builder = new LmxProxyClientBuilder()
+ .WithHost("localhost")
+ .WithTlsConfiguration(new ClientTlsConfiguration
+ {
+ UseTls = true,
+ ServerCaCertificatePath = "/nonexistent/cert.pem"
+ });
+ Assert.Throws(() => builder.Build());
+ }
+
+ [Fact]
+ public void WithHost_ThrowsOnNull()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithHost(null!));
+ }
+
+ [Fact]
+ public void WithHost_ThrowsOnEmpty()
+ {
+ Assert.Throws(() =>
+ new LmxProxyClientBuilder().WithHost(""));
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientFactoryTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientFactoryTests.cs
new file mode 100644
index 0000000..3b936a3
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientFactoryTests.cs
@@ -0,0 +1,51 @@
+using Microsoft.Extensions.Configuration;
+using Xunit;
+
+namespace ZB.MOM.WW.LmxProxy.Client.Tests;
+
+public class LmxProxyClientFactoryTests
+{
+ [Fact]
+ public void CreateClient_BindsFromConfiguration()
+ {
+ var config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ ["LmxProxy:Host"] = "10.100.0.48",
+ ["LmxProxy:Port"] = "50052",
+ ["LmxProxy:ApiKey"] = "test-key",
+ ["LmxProxy:Retry:MaxAttempts"] = "5",
+ ["LmxProxy:Retry:Delay"] = "00:00:02",
+ })
+ .Build();
+
+ var factory = new LmxProxyClientFactory(config);
+ var client = factory.CreateClient();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void CreateClient_NamedSection()
+ {
+ var config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ ["MyProxy:Host"] = "10.100.0.48",
+ ["MyProxy:Port"] = "50052",
+ })
+ .Build();
+
+ var factory = new LmxProxyClientFactory(config);
+ var client = factory.CreateClient("MyProxy");
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void CreateClient_BuilderAction()
+ {
+ var config = new ConfigurationBuilder().Build();
+ var factory = new LmxProxyClientFactory(config);
+ var client = factory.CreateClient(b => b.WithHost("localhost").WithPort(50051));
+ Assert.NotNull(client);
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ServiceCollectionExtensionsTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ServiceCollectionExtensionsTests.cs
new file mode 100644
index 0000000..890e3f0
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ServiceCollectionExtensionsTests.cs
@@ -0,0 +1,92 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Xunit;
+
+namespace ZB.MOM.WW.LmxProxy.Client.Tests;
+
+public class ServiceCollectionExtensionsTests
+{
+ [Fact]
+ public void AddLmxProxyClient_WithConfiguration_RegistersSingleton()
+ {
+ var config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ ["LmxProxy:Host"] = "localhost",
+ ["LmxProxy:Port"] = "50051",
+ })
+ .Build();
+
+ var services = new ServiceCollection();
+ services.AddLmxProxyClient(config);
+
+ using var provider = services.BuildServiceProvider();
+ var client = provider.GetRequiredService();
+ Assert.NotNull(client);
+ Assert.IsType(client);
+ }
+
+ [Fact]
+ public void AddLmxProxyClient_WithBuilderAction_RegistersSingleton()
+ {
+ var services = new ServiceCollection();
+ services.AddLmxProxyClient(b => b.WithHost("localhost").WithPort(50051));
+
+ using var provider = services.BuildServiceProvider();
+ var client = provider.GetRequiredService();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void AddLmxProxyClient_WithNamedSection_RegistersSingleton()
+ {
+ var config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ ["CustomProxy:Host"] = "10.0.0.1",
+ ["CustomProxy:Port"] = "50052",
+ })
+ .Build();
+
+ var services = new ServiceCollection();
+ services.AddLmxProxyClient(config, "CustomProxy");
+
+ using var provider = services.BuildServiceProvider();
+ var client = provider.GetRequiredService();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void AddScopedLmxProxyClient_RegistersScoped()
+ {
+ var config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ ["LmxProxy:Host"] = "localhost",
+ })
+ .Build();
+
+ var services = new ServiceCollection();
+ services.AddScopedLmxProxyClient(config);
+
+ using var provider = services.BuildServiceProvider();
+ using var scope = provider.CreateScope();
+ var client = scope.ServiceProvider.GetRequiredService();
+ Assert.NotNull(client);
+ }
+
+ [Fact]
+ public void AddNamedLmxProxyClient_RegistersKeyedSingleton()
+ {
+ var services = new ServiceCollection();
+ services.AddNamedLmxProxyClient("primary", b => b.WithHost("host-a").WithPort(50051));
+ services.AddNamedLmxProxyClient("secondary", b => b.WithHost("host-b").WithPort(50052));
+
+ using var provider = services.BuildServiceProvider();
+ var primary = provider.GetRequiredKeyedService("primary");
+ var secondary = provider.GetRequiredKeyedService("secondary");
+ Assert.NotNull(primary);
+ Assert.NotNull(secondary);
+ Assert.NotSame(primary, secondary);
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/StreamingExtensionsTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/StreamingExtensionsTests.cs
new file mode 100644
index 0000000..383d0bc
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/StreamingExtensionsTests.cs
@@ -0,0 +1,157 @@
+using Xunit;
+using ZB.MOM.WW.LmxProxy.Client.Domain;
+using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
+
+namespace ZB.MOM.WW.LmxProxy.Client.Tests;
+
+public class StreamingExtensionsTests
+{
+ [Fact]
+ public async Task ReadStreamAsync_BatchesCorrectly()
+ {
+ var fake = new FakeLmxProxyClient();
+ var addresses = Enumerable.Range(0, 250).Select(i => $"tag{i}").ToList();
+
+ var results = new List>();
+ await foreach (var kvp in fake.ReadStreamAsync(addresses, batchSize: 100))
+ {
+ results.Add(kvp);
+ }
+
+ // 250 tags at batchSize=100 => 3 batch calls (100, 100, 50)
+ Assert.Equal(3, fake.ReadBatchCalls.Count);
+ Assert.Equal(100, fake.ReadBatchCalls[0].Count);
+ Assert.Equal(100, fake.ReadBatchCalls[1].Count);
+ Assert.Equal(50, fake.ReadBatchCalls[2].Count);
+ Assert.Equal(250, results.Count);
+ }
+
+ [Fact]
+ public async Task ReadStreamAsync_RetriesOnError()
+ {
+ var fake = new FakeLmxProxyClient
+ {
+ ReadBatchExceptionToThrow = new InvalidOperationException("transient"),
+ ReadBatchExceptionCount = 1 // First call throws, second succeeds
+ };
+
+ var addresses = Enumerable.Range(0, 5).Select(i => $"tag{i}").ToList();
+ var results = new List>();
+ await foreach (var kvp in fake.ReadStreamAsync(addresses, batchSize: 10))
+ {
+ results.Add(kvp);
+ }
+
+ // Should retry: first call throws, second succeeds
+ Assert.Equal(2, fake.ReadBatchCalls.Count);
+ Assert.Equal(5, results.Count);
+ }
+
+ [Fact]
+ public async Task WriteStreamAsync_BatchesAndReturnsCount()
+ {
+ var fake = new FakeLmxProxyClient();
+ var values = GenerateWriteValues(250);
+
+ int total = await fake.WriteStreamAsync(values, batchSize: 100);
+
+ Assert.Equal(250, total);
+ Assert.Equal(3, fake.WriteBatchCalls.Count);
+ Assert.Equal(100, fake.WriteBatchCalls[0].Count);
+ Assert.Equal(100, fake.WriteBatchCalls[1].Count);
+ Assert.Equal(50, fake.WriteBatchCalls[2].Count);
+ }
+
+ [Fact]
+ public async Task ProcessInParallelAsync_RespectsMaxConcurrency()
+ {
+ int maxConcurrency = 2;
+ int currentConcurrency = 0;
+ int maxObservedConcurrency = 0;
+ var lockObj = new object();
+
+ var source = GenerateAsyncSequence(10);
+
+ await source.ProcessInParallelAsync(async (item, ct) =>
+ {
+ int current;
+ lock (lockObj)
+ {
+ currentConcurrency++;
+ current = currentConcurrency;
+ if (current > maxObservedConcurrency)
+ maxObservedConcurrency = current;
+ }
+
+ await Task.Delay(50, ct);
+
+ lock (lockObj)
+ {
+ currentConcurrency--;
+ }
+ }, maxConcurrency: maxConcurrency);
+
+ Assert.True(maxObservedConcurrency <= maxConcurrency,
+ $"Max observed concurrency {maxObservedConcurrency} exceeded limit {maxConcurrency}");
+ }
+
+ [Fact]
+ public async Task SubscribeStreamAsync_YieldsFromChannel()
+ {
+ var fake = new FakeLmxProxyClient();
+ var addresses = new[] { "tag1", "tag2" };
+ using var cts = new CancellationTokenSource();
+
+ var results = new List<(string Tag, Vtq Vtq)>();
+
+ // Start the subscription stream in a background task
+ var streamTask = Task.Run(async () =>
+ {
+ await foreach (var item in fake.SubscribeStreamAsync(addresses, cts.Token))
+ {
+ results.Add(item);
+ if (results.Count >= 3)
+ await cts.CancelAsync();
+ }
+ });
+
+ // Wait for subscribe to be called with a polling loop
+ for (int i = 0; i < 50 && fake.CapturedOnUpdate is null; i++)
+ await Task.Delay(50);
+
+ // Simulate updates via captured callback
+ Assert.NotNull(fake.CapturedOnUpdate);
+ fake.CapturedOnUpdate!("tag1", new Vtq(1.0, DateTime.UtcNow, Quality.Good));
+ fake.CapturedOnUpdate!("tag2", new Vtq(2.0, DateTime.UtcNow, Quality.Good));
+ fake.CapturedOnUpdate!("tag1", new Vtq(3.0, DateTime.UtcNow, Quality.Good));
+
+ // Wait for stream task to complete (cancelled after 3 items)
+ try { await streamTask; }
+ catch (OperationCanceledException) { }
+
+ Assert.Equal(3, results.Count);
+ Assert.Equal("tag1", results[0].Tag);
+ Assert.Equal("tag2", results[1].Tag);
+ Assert.Equal("tag1", results[2].Tag);
+ }
+
+ private static async IAsyncEnumerable> GenerateWriteValues(int count)
+ {
+ for (int i = 0; i < count; i++)
+ {
+ yield return new KeyValuePair(
+ $"tag{i}",
+ new TypedValue { DoubleValue = i * 1.0 });
+ await Task.Yield();
+ }
+ }
+
+ private static async IAsyncEnumerable GenerateAsyncSequence(int count)
+ {
+ for (int i = 0; i < count; i++)
+ {
+ yield return i;
+ await Task.Yield();
+ }
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj
index 9e2f765..11ba36d 100644
--- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj
@@ -18,6 +18,9 @@
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive