feat(dcl): RealMxGatewayClient over ZB.MOM.WW.MxGateway.Client

Seam implementation wrapping the gateway client + GalaxyRepositoryClient:
OpenSession/Register, AddItem+Advise subscribe, ReadBulk/WriteBulk with handle
tracking, StreamEvents loop with worker_sequence resume + OPC-style quality
mapping, and Galaxy BrowseChildren mapping (objects keyed by gobject id,
attributes by full tag reference). Only type touching the generated contracts.
This commit is contained in:
Joseph Doherty
2026-05-29 07:55:32 -04:00
parent 0a693e0be9
commit 20c24ef260
@@ -0,0 +1,267 @@
using System.Collections.Concurrent;
using System.Globalization;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.MxGateway.Client;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
/// <summary>
/// Production <see cref="IMxGatewayClient"/> implementation over the
/// <c>ZB.MOM.WW.MxGateway.Client</c> NuGet package. This is the only type in the
/// Data Connection Layer that references the generated gRPC/protobuf contracts;
/// the adapter and its tests run entirely against the neutral seam.
/// </summary>
public sealed class RealMxGatewayClient : IMxGatewayClient
{
private readonly ILogger<RealMxGatewayClient> _logger;
private readonly ILoggerFactory? _loggerFactory;
private MxGatewayClient? _client;
private GalaxyRepositoryClient? _galaxy;
private MxGatewaySession? _session;
private int _serverHandle;
private int _writeUserId;
private int _readTimeoutMs;
private ulong _lastSeq;
// tag ↔ MXAccess item handle, maintained across subscribe/write.
private readonly ConcurrentDictionary<string, int> _tagToHandle = new();
private readonly ConcurrentDictionary<int, string> _handleToTag = new();
/// <summary>Initializes a new instance of <see cref="RealMxGatewayClient"/>.</summary>
/// <param name="loggerFactory">Logger factory shared with the gateway client.</param>
public RealMxGatewayClient(ILoggerFactory? loggerFactory)
{
_loggerFactory = loggerFactory;
_logger = (loggerFactory ?? Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
.CreateLogger<RealMxGatewayClient>();
}
/// <inheritdoc />
public async Task ConnectAsync(MxGatewayConnectionOptions options, CancellationToken ct = default)
{
_writeUserId = options.WriteUserId;
_readTimeoutMs = options.ReadTimeoutMs;
var clientOptions = new MxGatewayClientOptions
{
Endpoint = new Uri(options.Endpoint),
ApiKey = options.ApiKey,
UseTls = options.UseTls,
CaCertificatePath = options.CaFile,
ServerNameOverride = options.ServerName,
LoggerFactory = _loggerFactory,
};
_client = MxGatewayClient.Create(clientOptions);
_galaxy = GalaxyRepositoryClient.Create(clientOptions);
_session = await _client.OpenSessionAsync(cancellationToken: ct).ConfigureAwait(false);
_serverHandle = await _session.RegisterAsync(options.ClientName, ct).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task DisconnectAsync(CancellationToken ct = default)
{
if (_session is not null)
await _session.CloseAsync(ct).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<string> SubscribeAsync(string tagPath, CancellationToken ct = default)
{
var handle = await GetOrAddItemHandleAsync(tagPath, ct).ConfigureAwait(false);
await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false);
return handle.ToString(CultureInfo.InvariantCulture);
}
/// <inheritdoc />
public async Task UnsubscribeAsync(string subscriptionId, CancellationToken ct = default)
{
if (!int.TryParse(subscriptionId, NumberStyles.Integer, CultureInfo.InvariantCulture, out var handle))
return;
await _session!.UnAdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false);
await _session.RemoveItemAsync(_serverHandle, handle, ct).ConfigureAwait(false);
if (_handleToTag.TryRemove(handle, out var tag))
_tagToHandle.TryRemove(tag, out _);
}
/// <inheritdoc />
public async Task<IReadOnlyList<MxReadOutcome>> ReadAsync(IReadOnlyList<string> tagPaths, CancellationToken ct = default)
{
var results = await _session!
.ReadBulkAsync(_serverHandle, tagPaths, TimeSpan.FromMilliseconds(_readTimeoutMs), ct)
.ConfigureAwait(false);
return results.Select(r => new MxReadOutcome(
r.TagAddress,
r.WasSuccessful,
r.WasSuccessful ? r.Value?.ToClrValue() : null,
MapQuality(r.Quality, r.Statuses),
r.SourceTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow,
r.WasSuccessful ? null : r.ErrorMessage)).ToList();
}
/// <inheritdoc />
public async Task<IReadOnlyList<MxWriteOutcome>> WriteAsync(IReadOnlyList<(string TagPath, object? Value)> writes, CancellationToken ct = default)
{
// Build entries in request order; remember the tag for each handle so the
// per-handle BulkWriteResult can be mapped back to its tag.
var entries = new List<WriteBulkEntry>(writes.Count);
var orderedTags = new List<string>(writes.Count);
foreach (var (tag, value) in writes)
{
var handle = await GetOrAddItemHandleAsync(tag, ct).ConfigureAwait(false);
entries.Add(new WriteBulkEntry
{
ItemHandle = handle,
Value = ToMxValue(value),
UserId = _writeUserId,
});
orderedTags.Add(tag);
}
var results = await _session!.WriteBulkAsync(_serverHandle, entries, ct).ConfigureAwait(false);
// Results are returned in request order; pair by index back to the tags.
return results.Select((r, i) => new MxWriteOutcome(
i < orderedTags.Count ? orderedTags[i] : (_handleToTag.TryGetValue(r.ItemHandle, out var t) ? t : ""),
r.WasSuccessful,
r.WasSuccessful ? null : r.ErrorMessage)).ToList();
}
/// <inheritdoc />
public async Task<(IReadOnlyList<MxBrowseChild> Children, bool Truncated)> BrowseChildrenAsync(string? parentNodeId, CancellationToken ct = default)
{
var request = new BrowseChildrenRequest { IncludeAttributes = true };
// Object NodeIds are the Galaxy gobject id (encoded as a string); attribute
// NodeIds are FullTagReference leaves and never arrive here as a parent.
if (!string.IsNullOrEmpty(parentNodeId)
&& int.TryParse(parentNodeId, NumberStyles.Integer, CultureInfo.InvariantCulture, out var gobjectId))
{
request.ParentGobjectId = gobjectId;
}
BrowseChildrenReply reply;
try
{
reply = await _galaxy!.BrowseChildrenRawAsync(request, ct).ConfigureAwait(false);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
{
throw new ConnectionNotConnectedException($"MxGateway repository unavailable: {ex.Status.Detail}");
}
var children = new List<MxBrowseChild>();
for (var i = 0; i < reply.Children.Count; i++)
{
var obj = reply.Children[i];
var hasChildren = i < reply.ChildHasChildren.Count && reply.ChildHasChildren[i];
// Navigable container node, keyed by gobject id.
children.Add(new MxBrowseChild(
obj.GobjectId.ToString(CultureInfo.InvariantCulture),
string.IsNullOrEmpty(obj.TagName) ? obj.ContainedName : obj.TagName,
BrowseNodeClass.Object,
hasChildren || obj.Attributes.Count > 0));
// Selectable attribute leaves, keyed by their full tag reference.
foreach (var attr in obj.Attributes)
{
children.Add(new MxBrowseChild(
attr.FullTagReference,
attr.AttributeName,
BrowseNodeClass.Variable,
false));
}
}
return (children, !string.IsNullOrEmpty(reply.NextPageToken));
}
/// <inheritdoc />
public async Task RunEventLoopAsync(Action<MxValueUpdate> onUpdate, CancellationToken ct = default)
{
await foreach (var ev in _session!.StreamEventsAsync(_lastSeq, ct).ConfigureAwait(false))
{
_lastSeq = ev.WorkerSequence;
if (ev.Family != MxEventFamily.OnDataChange)
continue;
if (!_handleToTag.TryGetValue(ev.ItemHandle, out var tag))
continue;
onUpdate(new MxValueUpdate(
tag,
ev.Value?.ToClrValue(),
MapQuality(ev.Quality, ev.Statuses),
ev.SourceTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow));
}
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_session is not null) await _session.DisposeAsync().ConfigureAwait(false);
if (_client is not null) await _client.DisposeAsync().ConfigureAwait(false);
if (_galaxy is not null) await _galaxy.DisposeAsync().ConfigureAwait(false);
}
private async Task<int> GetOrAddItemHandleAsync(string tagPath, CancellationToken ct)
{
if (_tagToHandle.TryGetValue(tagPath, out var existing))
return existing;
var handle = await _session!.AddItemAsync(_serverHandle, tagPath, ct).ConfigureAwait(false);
_tagToHandle[tagPath] = handle;
_handleToTag[handle] = tagPath;
return handle;
}
/// <summary>
/// Maps MXAccess quality. A failing status proxy is authoritative bad; otherwise
/// the OPC-style quality byte: ≥192 Good, ≥64 Uncertain, else Bad.
/// </summary>
private static QualityCode MapQuality(int quality, IEnumerable<MxStatusProxy> statuses)
{
if (statuses.Any(s => !s.IsSuccess()))
return QualityCode.Bad;
return quality switch
{
>= 192 => QualityCode.Good,
>= 64 => QualityCode.Uncertain,
_ => QualityCode.Bad,
};
}
private static MxValue ToMxValue(object? value) => value switch
{
null => new MxValue { IsNull = true },
bool b => b.ToMxValue(),
int i => i.ToMxValue(),
long l => l.ToMxValue(),
float f => f.ToMxValue(),
double d => d.ToMxValue(),
string s => s.ToMxValue(),
DateTimeOffset dto => dto.ToMxValue(),
DateTime dt => dt.ToMxValue(),
// Fall back to invariant string for any other CLR type.
_ => Convert.ToString(value, CultureInfo.InvariantCulture)!.ToMxValue(),
};
}
/// <summary>Builds <see cref="RealMxGatewayClient"/> instances.</summary>
public sealed class RealMxGatewayClientFactory : IMxGatewayClientFactory
{
private readonly ILoggerFactory? _loggerFactory;
/// <summary>Initializes a new factory.</summary>
/// <param name="loggerFactory">Logger factory passed to each created client.</param>
public RealMxGatewayClientFactory(ILoggerFactory? loggerFactory) => _loggerFactory = loggerFactory;
/// <inheritdoc />
public IMxGatewayClient Create() => new RealMxGatewayClient(_loggerFactory);
}