Files
scadalink-design/lmxproxy/docs/plans/phase-5-client-core.md
Joseph Doherty 4303f06fc3 docs(lmxproxy): add v2 rebuild design, 7-phase implementation plans, and execution prompt
Design doc covers architecture, v2 protocol (TypedValue/QualityCode), COM threading
model, session lifecycle, subscription semantics, error model, and guardrails.
Implementation plans are detailed enough for autonomous Claude Code execution.
Verified all dev tooling on windev (Grpc.Tools, protobuf-net.Grpc, Polly v8, xUnit).
2026-03-21 23:29:42 -04:00

30 KiB

Phase 5: Client Core — Implementation Plan

Date: 2026-03-21 Prerequisites: Phase 1 complete and passing (Protocol & Domain Types — ScadaContracts.cs with v2 TypedValue/QualityCode messages, Quality.cs, QualityExtensions.cs, Vtq.cs, ConnectionState.cs all exist and cross-stack serialization tests pass) Working Directory: The lmxproxy repo is on windev at C:\src\lmxproxy

Guardrails

  1. Client targets .NET 10, AnyCPU — use latest C# features freely. The csproj <TargetFramework> is net10.0, <LangVersion>latest</LangVersion>.
  2. Code-first gRPC only — the Client uses protobuf-net.Grpc with [ServiceContract]/[DataContract] attributes. Never reference proto files or Grpc.Tools.
  3. No string serialization heuristics — v2 uses native TypedValue. Do not write double.TryParse, bool.TryParse, or any string-to-value parsing on tag values.
  4. status_code is canonical for qualitysymbolic_name is derived. Never set symbolic_name independently.
  5. Polly v8 API — the Client csproj already has <PackageReference Include="Polly" Version="8.5.2" />. Use the v8 ResiliencePipeline API, not the legacy v7 IAsyncPolicy API.
  6. No new NuGet packages — all needed packages are already in src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj.
  7. Build command: dotnet build src/ZB.MOM.WW.LmxProxy.Client
  8. Test command: dotnet test tests/ZB.MOM.WW.LmxProxy.Client.Tests
  9. Namespace root: ZB.MOM.WW.LmxProxy.Client

Step 1: ClientTlsConfiguration

File: src/ZB.MOM.WW.LmxProxy.Client/ClientTlsConfiguration.cs

This file already exists with the correct shape. Verify it has all these properties (from Component-Client.md):

namespace ZB.MOM.WW.LmxProxy.Client;

public class ClientTlsConfiguration
{
    public bool UseTls { get; set; } = false;
    public string? ClientCertificatePath { get; set; }
    public string? ClientKeyPath { get; set; }
    public string? ServerCaCertificatePath { get; set; }
    public string? ServerNameOverride { get; set; }
    public bool ValidateServerCertificate { get; set; } = true;
    public bool AllowSelfSignedCertificates { get; set; } = false;
    public bool IgnoreAllCertificateErrors { get; set; } = false;
}

If it matches, no changes needed. If any properties are missing, add them.

Step 2: Security/GrpcChannelFactory

File: src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs

This file already exists. Verify the implementation covers:

  1. CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger) — returns GrpcChannel.
  2. Creates SocketsHttpHandler with EnableMultipleHttp2Connections = true.
  3. For TLS: sets SslProtocols = Tls12 | Tls13, configures ServerNameOverride as TargetHost, loads client certificate from PEM files for mTLS.
  4. Certificate validation callback handles: IgnoreAllCertificateErrors, !ValidateServerCertificate, custom CA trust store via ServerCaCertificatePath, AllowSelfSignedCertificates.
  5. Static constructor sets System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport = true for non-TLS.

The existing implementation matches. No changes expected unless Phase 1 introduced breaking changes.

Step 3: ILmxProxyClient Interface

File: src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClient.cs

Rewrite for v2 protocol. The key changes from v1:

  • WriteAsync and WriteBatchAsync accept TypedValue instead of object
  • SubscribeAsync has an onStreamError callback parameter
  • CheckApiKeyAsync is added
  • Return types use v2 domain Vtq (which wraps TypedValue + QualityCode)
using ZB.MOM.WW.LmxProxy.Client.Domain;

namespace ZB.MOM.WW.LmxProxy.Client;

/// <summary>
/// Interface for LmxProxy client operations.
/// </summary>
public interface ILmxProxyClient : IDisposable, IAsyncDisposable
{
    /// <summary>Gets or sets the default timeout for operations (range: 1s to 10min).</summary>
    TimeSpan DefaultTimeout { get; set; }

    /// <summary>Connects to the LmxProxy service and establishes a session.</summary>
    Task ConnectAsync(CancellationToken cancellationToken = default);

    /// <summary>Disconnects from the LmxProxy service.</summary>
    Task DisconnectAsync();

    /// <summary>Returns true if the client has an active session.</summary>
    Task<bool> IsConnectedAsync();

    /// <summary>Reads a single tag value.</summary>
    Task<Vtq> ReadAsync(string address, CancellationToken cancellationToken = default);

    /// <summary>Reads multiple tag values in a single batch.</summary>
    Task<IDictionary<string, Vtq>> ReadBatchAsync(IEnumerable<string> addresses, CancellationToken cancellationToken = default);

    /// <summary>Writes a single tag value (native TypedValue — no string heuristics).</summary>
    Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default);

    /// <summary>Writes multiple tag values in a single batch.</summary>
    Task WriteBatchAsync(IDictionary<string, TypedValue> values, CancellationToken cancellationToken = default);

    /// <summary>
    /// Writes a batch of values, then polls a flag tag until it matches or timeout expires.
    /// Returns (writeResults, flagReached, elapsedMs).
    /// </summary>
    Task<WriteBatchAndWaitResponse> WriteBatchAndWaitAsync(
        IDictionary<string, TypedValue> values,
        string flagTag,
        TypedValue flagValue,
        int timeoutMs = 5000,
        int pollIntervalMs = 100,
        CancellationToken cancellationToken = default);

    /// <summary>Subscribes to tag updates with value and error callbacks.</summary>
    Task<ISubscription> SubscribeAsync(
        IEnumerable<string> addresses,
        Action<string, Vtq> onUpdate,
        Action<Exception>? onStreamError = null,
        CancellationToken cancellationToken = default);

    /// <summary>Validates an API key and returns info.</summary>
    Task<ApiKeyInfo> CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default);

    /// <summary>Returns a snapshot of client-side metrics.</summary>
    Dictionary<string, object> GetMetrics();
}

Note: The TypedValue class referenced here is from Domain/ScadaContracts.cs — it should already have been updated in Phase 1 to use [DataContract] with the v2 oneof-style properties (e.g., BoolValue, Int32Value, DoubleValue, StringValue, DatetimeValue, etc., with a ValueCase enum or similar discriminator).

Step 4: LmxProxyClient — Main File

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.cs

This is a partial class. The main file contains the constructor, fields, properties, and the Read/Write/WriteBatch/WriteBatchAndWait/CheckApiKey methods.

4.1 Fields and Constructor

public partial class LmxProxyClient : ILmxProxyClient
{
    private readonly ILogger<LmxProxyClient> _logger;
    private readonly string _host;
    private readonly int _port;
    private readonly string? _apiKey;
    private readonly ClientTlsConfiguration? _tlsConfiguration;
    private readonly ClientMetrics _metrics = new();
    private readonly SemaphoreSlim _connectionLock = new(1, 1);
    private readonly List<ISubscription> _activeSubscriptions = [];
    private readonly Lock _subscriptionLock = new();

    private GrpcChannel? _channel;
    private IScadaService? _client;
    private string _sessionId = string.Empty;
    private bool _disposed;
    private bool _isConnected;
    private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30);
    private ClientConfiguration? _configuration;
    private ResiliencePipeline? _resiliencePipeline;  // Polly v8
    private Timer? _keepAliveTimer;
    private readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30);

    // IsConnected computed property
    public bool IsConnected => !_disposed && _isConnected && !string.IsNullOrEmpty(_sessionId);

    public LmxProxyClient(
        string host, int port, string? apiKey,
        ClientTlsConfiguration? tlsConfiguration,
        ILogger<LmxProxyClient>? logger = null)
    {
        _host = host ?? throw new ArgumentNullException(nameof(host));
        _port = port;
        _apiKey = apiKey;
        _tlsConfiguration = tlsConfiguration;
        _logger = logger ?? NullLogger<LmxProxyClient>.Instance;
    }

    internal void SetBuilderConfiguration(ClientConfiguration config)
    {
        _configuration = config;
        // Build Polly v8 ResiliencePipeline from config
        if (config.MaxRetryAttempts > 0)
        {
            _resiliencePipeline = new ResiliencePipelineBuilder()
                .AddRetry(new RetryStrategyOptions
                {
                    MaxRetryAttempts = config.MaxRetryAttempts,
                    Delay = config.RetryDelay,
                    BackoffType = DelayBackoffType.Exponential,
                    ShouldHandle = new PredicateBuilder()
                        .Handle<RpcException>(ex =>
                            ex.StatusCode == StatusCode.Unavailable ||
                            ex.StatusCode == StatusCode.DeadlineExceeded ||
                            ex.StatusCode == StatusCode.ResourceExhausted ||
                            ex.StatusCode == StatusCode.Aborted),
                    OnRetry = args =>
                    {
                        _logger.LogWarning("Retry {Attempt} after {Delay} for {Exception}",
                            args.AttemptNumber, args.RetryDelay, args.Outcome.Exception?.Message);
                        return ValueTask.CompletedTask;
                    }
                })
                .Build();
        }
    }
}

4.2 ReadAsync

public async Task<Vtq> ReadAsync(string address, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    _metrics.IncrementOperationCount("Read");
    var sw = Stopwatch.StartNew();
    try
    {
        var request = new ReadRequest { SessionId = _sessionId, Tag = address };
        ReadResponse response = await ExecuteWithRetry(
            () => _client!.ReadAsync(request).AsTask(), cancellationToken);
        if (!response.Success)
            throw new InvalidOperationException($"Read failed: {response.Message}");
        return ConvertVtqMessage(response.Vtq);
    }
    catch (Exception ex)
    {
        _metrics.IncrementErrorCount("Read");
        throw;
    }
    finally
    {
        sw.Stop();
        _metrics.RecordLatency("Read", sw.ElapsedMilliseconds);
    }
}

4.3 ReadBatchAsync

public async Task<IDictionary<string, Vtq>> ReadBatchAsync(
    IEnumerable<string> addresses, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    _metrics.IncrementOperationCount("ReadBatch");
    var sw = Stopwatch.StartNew();
    try
    {
        var request = new ReadBatchRequest { SessionId = _sessionId, Tags = addresses.ToList() };
        ReadBatchResponse response = await ExecuteWithRetry(
            () => _client!.ReadBatchAsync(request).AsTask(), cancellationToken);
        var result = new Dictionary<string, Vtq>();
        foreach (var vtqMsg in response.Vtqs)
        {
            result[vtqMsg.Tag] = ConvertVtqMessage(vtqMsg);
        }
        return result;
    }
    catch
    {
        _metrics.IncrementErrorCount("ReadBatch");
        throw;
    }
    finally
    {
        sw.Stop();
        _metrics.RecordLatency("ReadBatch", sw.ElapsedMilliseconds);
    }
}

4.4 WriteAsync

public async Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    _metrics.IncrementOperationCount("Write");
    var sw = Stopwatch.StartNew();
    try
    {
        var request = new WriteRequest { SessionId = _sessionId, Tag = address, Value = value };
        WriteResponse response = await ExecuteWithRetry(
            () => _client!.WriteAsync(request).AsTask(), cancellationToken);
        if (!response.Success)
            throw new InvalidOperationException($"Write failed: {response.Message}");
    }
    catch
    {
        _metrics.IncrementErrorCount("Write");
        throw;
    }
    finally
    {
        sw.Stop();
        _metrics.RecordLatency("Write", sw.ElapsedMilliseconds);
    }
}

4.5 WriteBatchAsync

public async Task WriteBatchAsync(IDictionary<string, TypedValue> values, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    _metrics.IncrementOperationCount("WriteBatch");
    var sw = Stopwatch.StartNew();
    try
    {
        var request = new WriteBatchRequest
        {
            SessionId = _sessionId,
            Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList()
        };
        WriteBatchResponse response = await ExecuteWithRetry(
            () => _client!.WriteBatchAsync(request).AsTask(), cancellationToken);
        if (!response.Success)
            throw new InvalidOperationException($"WriteBatch failed: {response.Message}");
    }
    catch
    {
        _metrics.IncrementErrorCount("WriteBatch");
        throw;
    }
    finally
    {
        sw.Stop();
        _metrics.RecordLatency("WriteBatch", sw.ElapsedMilliseconds);
    }
}

4.6 WriteBatchAndWaitAsync

public async Task<WriteBatchAndWaitResponse> WriteBatchAndWaitAsync(
    IDictionary<string, TypedValue> values, string flagTag, TypedValue flagValue,
    int timeoutMs = 5000, int pollIntervalMs = 100, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    var request = new WriteBatchAndWaitRequest
    {
        SessionId = _sessionId,
        Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList(),
        FlagTag = flagTag,
        FlagValue = flagValue,
        TimeoutMs = timeoutMs,
        PollIntervalMs = pollIntervalMs
    };
    return await ExecuteWithRetry(
        () => _client!.WriteBatchAndWaitAsync(request).AsTask(), cancellationToken);
}

4.7 CheckApiKeyAsync

public async Task<ApiKeyInfo> CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default)
{
    EnsureConnected();
    var request = new CheckApiKeyRequest { ApiKey = apiKey };
    CheckApiKeyResponse response = await _client!.CheckApiKeyAsync(request);
    return new ApiKeyInfo { IsValid = response.IsValid, Description = response.Message };
}

4.8 ConvertVtqMessage helper

This converts the wire VtqMessage (v2 with TypedValue + QualityCode) to the domain Vtq:

private static Vtq ConvertVtqMessage(VtqMessage? msg)
{
    if (msg is null)
        return new Vtq(null, DateTime.UtcNow, Quality.Bad);

    object? value = ExtractTypedValue(msg.Value);
    DateTime timestamp = msg.TimestampUtcTicks > 0
        ? new DateTime(msg.TimestampUtcTicks, DateTimeKind.Utc)
        : DateTime.UtcNow;
    Quality quality = QualityExtensions.FromStatusCode(msg.Quality?.StatusCode ?? 0x80000000u);
    return new Vtq(value, timestamp, quality);
}

private static object? ExtractTypedValue(TypedValue? tv)
{
    if (tv is null) return null;
    // Switch on whichever oneof-style property is set
    // The exact property names depend on the Phase 1 code-first contract design
    // e.g., tv.BoolValue, tv.Int32Value, tv.DoubleValue, tv.StringValue, etc.
    // Return the native .NET value directly — no string conversions
    ...
}

Important: The exact shape of TypedValue in code-first contracts depends on Phase 1's implementation. Phase 1 should have defined a discriminator pattern (e.g., ValueCase enum or nullable properties with a convention). Adapt ExtractTypedValue to whatever pattern was chosen. The key rule: no string heuristics.

4.9 ExecuteWithRetry helper

private async Task<T> ExecuteWithRetry<T>(Func<Task<T>> operation, CancellationToken ct)
{
    if (_resiliencePipeline is not null)
    {
        return await _resiliencePipeline.ExecuteAsync(
            async token => await operation(), ct);
    }
    return await operation();
}

4.10 EnsureConnected, Dispose, DisposeAsync

private void EnsureConnected()
{
    ObjectDisposedException.ThrowIf(_disposed, this);
    if (!IsConnected)
        throw new InvalidOperationException("Client is not connected. Call ConnectAsync first.");
}

public void Dispose()
{
    if (_disposed) return;
    _disposed = true;
    _keepAliveTimer?.Dispose();
    _channel?.Dispose();
    _connectionLock.Dispose();
}

public async ValueTask DisposeAsync()
{
    if (_disposed) return;
    try { await DisconnectAsync(); } catch { /* swallow */ }
    Dispose();
}

4.11 IsConnectedAsync

public Task<bool> IsConnectedAsync() => Task.FromResult(IsConnected);

4.12 GetMetrics

public Dictionary<string, object> GetMetrics() => _metrics.GetSnapshot();

4.13 Verify build

ssh windev "cd C:\src\lmxproxy && dotnet build src/ZB.MOM.WW.LmxProxy.Client"

Step 5: LmxProxyClient.Connection

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs

Partial class containing ConnectAsync, DisconnectAsync, keep-alive, MarkDisconnectedAsync, BuildEndpointUri.

5.1 ConnectAsync

  1. Acquire _connectionLock.
  2. Throw ObjectDisposedException if disposed.
  3. Return early if already connected.
  4. Build endpoint URI via BuildEndpointUri().
  5. Create channel: GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger).
  6. Create code-first client: channel.CreateGrpcService<IScadaService>() (from ProtoBuf.Grpc.Client).
  7. Send ConnectRequest with ClientId = $"ScadaBridge-{Guid.NewGuid():N}" and ApiKey = _apiKey ?? string.Empty.
  8. If !response.Success, dispose channel and throw.
  9. Store channel, client, sessionId. Set _isConnected = true.
  10. Call StartKeepAlive().
  11. On failure, reset all state and rethrow.
  12. Release lock in finally.

5.2 DisconnectAsync

  1. Acquire _connectionLock.
  2. Stop keep-alive.
  3. If client and session exist, send DisconnectRequest. Swallow exceptions.
  4. Clear client, sessionId, isConnected. Dispose channel.
  5. Release lock.

5.3 Keep-alive timer

  • StartKeepAlive(): creates Timer with _keepAliveInterval (30s) interval.
  • Timer callback: sends GetConnectionStateRequest. On failure: stops timer, calls MarkDisconnectedAsync(ex).
  • StopKeepAlive(): disposes timer, nulls it.

5.4 MarkDisconnectedAsync

  1. If disposed, return.
  2. Acquire _connectionLock, set _isConnected = false, clear client/sessionId, dispose channel. Release lock.
  3. Copy and clear _activeSubscriptions under _subscriptionLock.
  4. Dispose each subscription (swallow errors).
  5. Log warning with the exception.

5.5 BuildEndpointUri

private Uri BuildEndpointUri()
{
    string scheme = _tlsConfiguration?.UseTls == true ? Uri.UriSchemeHttps : Uri.UriSchemeHttp;
    return new UriBuilder { Scheme = scheme, Host = _host, Port = _port }.Uri;
}

5.6 Verify build

ssh windev "cd C:\src\lmxproxy && dotnet build src/ZB.MOM.WW.LmxProxy.Client"

Step 6: LmxProxyClient.CodeFirstSubscription

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.CodeFirstSubscription.cs

Nested class inside LmxProxyClient implementing ISubscription.

6.1 CodeFirstSubscription class

private class CodeFirstSubscription : ISubscription
{
    private readonly IScadaService _client;
    private readonly string _sessionId;
    private readonly List<string> _tags;
    private readonly Action<string, Vtq> _onUpdate;
    private readonly Action<Exception>? _onStreamError;
    private readonly ILogger<LmxProxyClient> _logger;
    private readonly Action<ISubscription>? _onDispose;
    private readonly CancellationTokenSource _cts = new();
    private Task? _processingTask;
    private bool _disposed;
    private bool _streamErrorFired;

Constructor takes all of these. StartAsync stores _processingTask = ProcessUpdatesAsync(cancellationToken).

6.2 ProcessUpdatesAsync

private async Task ProcessUpdatesAsync(CancellationToken cancellationToken)
{
    try
    {
        var request = new SubscribeRequest
        {
            SessionId = _sessionId,
            Tags = _tags,
            SamplingMs = 1000
        };
        using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);

        await foreach (VtqMessage vtqMsg in _client.SubscribeAsync(request, linkedCts.Token))
        {
            try
            {
                Vtq vtq = ConvertVtqMessage(vtqMsg);  // static method from outer class
                _onUpdate(vtqMsg.Tag, vtq);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing subscription update for {Tag}", vtqMsg.Tag);
            }
        }
    }
    catch (OperationCanceledException) when (_cts.IsCancellationRequested || cancellationToken.IsCancellationRequested)
    {
        _logger.LogDebug("Subscription cancelled");
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error in subscription processing");
        FireStreamError(ex);
    }
    finally
    {
        if (!_disposed)
        {
            _disposed = true;
            _onDispose?.Invoke(this);
        }
    }
}

private void FireStreamError(Exception ex)
{
    if (_streamErrorFired) return;
    _streamErrorFired = true;
    try { _onStreamError?.Invoke(ex); }
    catch (Exception cbEx) { _logger.LogWarning(cbEx, "onStreamError callback threw"); }
}

Key difference from v1: The ConvertVtqMessage now handles TypedValue + QualityCode natively instead of parsing strings. Also, _onStreamError callback is invoked exactly once on stream termination (per Component-Client.md section 5.1).

6.3 DisposeAsync and Dispose

DisposeAsync(): Cancel CTS, await _processingTask (swallow errors), dispose CTS. 5-second timeout guard.

Dispose(): Calls DisposeAsync() synchronously with Task.Wait(TimeSpan.FromSeconds(5)).

6.4 Verify build

ssh windev "cd C:\src\lmxproxy && dotnet build src/ZB.MOM.WW.LmxProxy.Client"

Step 7: LmxProxyClient.ClientMetrics

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ClientMetrics.cs

Internal class. Already exists in v1 reference. Rewrite for v2 with p99 support.

internal class ClientMetrics
{
    private readonly ConcurrentDictionary<string, long> _operationCounts = new();
    private readonly ConcurrentDictionary<string, long> _errorCounts = new();
    private readonly ConcurrentDictionary<string, List<long>> _latencies = new();
    private readonly Lock _latencyLock = new();

    public void IncrementOperationCount(string operation) { ... }
    public void IncrementErrorCount(string operation) { ... }
    public void RecordLatency(string operation, long milliseconds) { ... }
    public Dictionary<string, object> GetSnapshot() { ... }
}

RecordLatency: Under _latencyLock, add to list. If count > 1000, RemoveAt(0).

GetSnapshot: Returns dictionary with keys {op}_count, {op}_errors, {op}_avg_latency_ms, {op}_p95_latency_ms, {op}_p99_latency_ms.

GetPercentile(List<long> values, int percentile): Sort, compute index as (int)Math.Ceiling(percentile / 100.0 * sorted.Count) - 1, clamp with Math.Max(0, ...).

Step 8: LmxProxyClient.ApiKeyInfo

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ApiKeyInfo.cs

Simple DTO returned by CheckApiKeyAsync:

namespace ZB.MOM.WW.LmxProxy.Client;

public partial class LmxProxyClient
{
    /// <summary>
    /// Result of an API key validation check.
    /// </summary>
    public class ApiKeyInfo
    {
        public bool IsValid { get; init; }
        public string? Role { get; init; }
        public string? Description { get; init; }
    }
}

Step 9: LmxProxyClient.ISubscription

File: src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ISubscription.cs

namespace ZB.MOM.WW.LmxProxy.Client;

public partial class LmxProxyClient
{
    /// <summary>
    /// Represents an active tag subscription. Dispose to unsubscribe.
    /// </summary>
    public interface ISubscription : IDisposable
    {
        /// <summary>Asynchronous disposal with cancellation support.</summary>
        Task DisposeAsync();
    }
}

Step 10: Unit Tests

Project: tests/ZB.MOM.WW.LmxProxy.Client.Tests/

Create if not exists:

ssh windev "cd C:\src\lmxproxy && dotnet new xunit -n ZB.MOM.WW.LmxProxy.Client.Tests -o tests/ZB.MOM.WW.LmxProxy.Client.Tests --framework net10.0"

Csproj for tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj:

  • <TargetFramework>net10.0</TargetFramework>
  • <ProjectReference Include="..\..\src\ZB.MOM.WW.LmxProxy.Client\ZB.MOM.WW.LmxProxy.Client.csproj" />
  • <PackageReference Include="xunit" Version="2.9.3" />
  • <PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
  • <PackageReference Include="NSubstitute" Version="5.3.0" />
  • <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />

Add to solution ZB.MOM.WW.LmxProxy.slnx:

<Folder Name="/tests/">
    <Project Path="tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj" />
</Folder>

10.1 Connection Lifecycle Tests

File: tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientConnectionTests.cs

Mock IScadaService using NSubstitute.

public class LmxProxyClientConnectionTests
{
    [Fact]
    public async Task ConnectAsync_EstablishesSessionAndStartsKeepAlive()

    [Fact]
    public async Task ConnectAsync_ThrowsWhenServerReturnsFailure()

    [Fact]
    public async Task DisconnectAsync_SendsDisconnectAndClearsState()

    [Fact]
    public async Task IsConnectedAsync_ReturnsFalseBeforeConnect()

    [Fact]
    public async Task IsConnectedAsync_ReturnsTrueAfterConnect()

    [Fact]
    public async Task KeepAliveFailure_MarksDisconnected()
}

Note: Testing the keep-alive requires either waiting 30s (too slow) or making the interval configurable for tests. Consider passing the interval as an internal constructor parameter or using a test-only subclass. Alternatively, test MarkDisconnectedAsync directly.

10.2 Read/Write Tests

File: tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientReadWriteTests.cs

public class LmxProxyClientReadWriteTests
{
    [Fact]
    public async Task ReadAsync_ReturnsVtqFromResponse()
    // Mock ReadAsync to return a VtqMessage with TypedValue.DoubleValue = 42.5
    // Verify returned Vtq.Value is 42.5 (double)

    [Fact]
    public async Task ReadAsync_ThrowsOnFailureResponse()

    [Fact]
    public async Task ReadBatchAsync_ReturnsDictionaryOfVtqs()

    [Fact]
    public async Task WriteAsync_SendsTypedValueDirectly()
    // Verify the WriteRequest.Value is the TypedValue passed in, not a string

    [Fact]
    public async Task WriteBatchAsync_SendsAllItems()

    [Fact]
    public async Task WriteBatchAndWaitAsync_ReturnsResponse()
}

10.3 Subscription Tests

File: tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientSubscriptionTests.cs

public class LmxProxyClientSubscriptionTests
{
    [Fact]
    public async Task SubscribeAsync_InvokesCallbackForEachUpdate()

    [Fact]
    public async Task SubscribeAsync_InvokesStreamErrorOnFailure()

    [Fact]
    public async Task SubscribeAsync_DisposeStopsProcessing()
}

10.4 TypedValue Conversion Tests

File: tests/ZB.MOM.WW.LmxProxy.Client.Tests/TypedValueConversionTests.cs

public class TypedValueConversionTests
{
    [Fact] public void ConvertVtqMessage_ExtractsBoolValue()
    [Fact] public void ConvertVtqMessage_ExtractsInt32Value()
    [Fact] public void ConvertVtqMessage_ExtractsInt64Value()
    [Fact] public void ConvertVtqMessage_ExtractsFloatValue()
    [Fact] public void ConvertVtqMessage_ExtractsDoubleValue()
    [Fact] public void ConvertVtqMessage_ExtractsStringValue()
    [Fact] public void ConvertVtqMessage_ExtractsDateTimeValue()
    [Fact] public void ConvertVtqMessage_HandlesNullTypedValue()
    [Fact] public void ConvertVtqMessage_HandlesNullMessage()
    [Fact] public void ConvertVtqMessage_MapsQualityCodeCorrectly()
    [Fact] public void ConvertVtqMessage_GoodQualityCode()
    [Fact] public void ConvertVtqMessage_BadQualityCode()
    [Fact] public void ConvertVtqMessage_UncertainQualityCode()
}

10.5 Metrics Tests

File: tests/ZB.MOM.WW.LmxProxy.Client.Tests/ClientMetricsTests.cs

public class ClientMetricsTests
{
    [Fact] public void IncrementOperationCount_Increments()
    [Fact] public void IncrementErrorCount_Increments()
    [Fact] public void RecordLatency_StoresValues()
    [Fact] public void RollingBuffer_CapsAt1000()
    [Fact] public void GetSnapshot_IncludesP95AndP99()
}

10.6 Run tests

ssh windev "cd C:\src\lmxproxy && dotnet test tests/ZB.MOM.WW.LmxProxy.Client.Tests --verbosity normal"

Step 11: Build Verification

ssh windev "cd C:\src\lmxproxy && dotnet build ZB.MOM.WW.LmxProxy.slnx && dotnet test --verbosity normal"

Completion Criteria

  • ILmxProxyClient interface updated for v2 (TypedValue parameters, onStreamError callback, CheckApiKeyAsync)
  • LmxProxyClient.cs — main file with Read/Write/WriteBatch/WriteBatchAndWait/CheckApiKey using v2 TypedValue
  • LmxProxyClient.Connection.cs — ConnectAsync, DisconnectAsync, keep-alive (30s), MarkDisconnectedAsync
  • LmxProxyClient.CodeFirstSubscription.cs — IAsyncEnumerable processing, onStreamError callback, 5s dispose timeout
  • LmxProxyClient.ClientMetrics.cs — per-op counts/errors/latency, 1000-sample buffer, p95/p99
  • LmxProxyClient.ApiKeyInfo.cs — simple DTO
  • LmxProxyClient.ISubscription.cs — IDisposable + DisposeAsync
  • ClientTlsConfiguration.cs — all properties present
  • Security/GrpcChannelFactory.cs — TLS 1.2/1.3, cert validation, custom CA, self-signed support
  • No string serialization heuristics anywhere in Client code
  • ConvertVtqMessage extracts native TypedValue without parsing
  • Polly v8 ResiliencePipeline for retry (not v7 IAsyncPolicy)
  • All unit tests pass
  • Solution builds cleanly