From 20c24ef260d56e4525668969eb46bce001b4e8f9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 29 May 2026 07:55:32 -0400 Subject: [PATCH] feat(dcl): RealMxGatewayClient over ZB.MOM.WW.MxGateway.Client Seam implementation wrapping the gateway client + GalaxyRepositoryClient: OpenSession/Register, AddItem+Advise subscribe, ReadBulk/WriteBulk with handle tracking, StreamEvents loop with worker_sequence resume + OPC-style quality mapping, and Galaxy BrowseChildren mapping (objects keyed by gobject id, attributes by full tag reference). Only type touching the generated contracts. --- .../Adapters/RealMxGatewayClient.cs | 267 ++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs new file mode 100644 index 00000000..d34a9968 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs @@ -0,0 +1,267 @@ +using System.Collections.Concurrent; +using System.Globalization; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.MxGateway.Client; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; + +/// +/// Production implementation over the +/// ZB.MOM.WW.MxGateway.Client NuGet package. This is the only type in the +/// Data Connection Layer that references the generated gRPC/protobuf contracts; +/// the adapter and its tests run entirely against the neutral seam. +/// +public sealed class RealMxGatewayClient : IMxGatewayClient +{ + private readonly ILogger _logger; + private readonly ILoggerFactory? _loggerFactory; + + private MxGatewayClient? _client; + private GalaxyRepositoryClient? _galaxy; + private MxGatewaySession? _session; + private int _serverHandle; + private int _writeUserId; + private int _readTimeoutMs; + private ulong _lastSeq; + + // tag ↔ MXAccess item handle, maintained across subscribe/write. + private readonly ConcurrentDictionary _tagToHandle = new(); + private readonly ConcurrentDictionary _handleToTag = new(); + + /// Initializes a new instance of . + /// Logger factory shared with the gateway client. + public RealMxGatewayClient(ILoggerFactory? loggerFactory) + { + _loggerFactory = loggerFactory; + _logger = (loggerFactory ?? Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) + .CreateLogger(); + } + + /// + public async Task ConnectAsync(MxGatewayConnectionOptions options, CancellationToken ct = default) + { + _writeUserId = options.WriteUserId; + _readTimeoutMs = options.ReadTimeoutMs; + + var clientOptions = new MxGatewayClientOptions + { + Endpoint = new Uri(options.Endpoint), + ApiKey = options.ApiKey, + UseTls = options.UseTls, + CaCertificatePath = options.CaFile, + ServerNameOverride = options.ServerName, + LoggerFactory = _loggerFactory, + }; + + _client = MxGatewayClient.Create(clientOptions); + _galaxy = GalaxyRepositoryClient.Create(clientOptions); + _session = await _client.OpenSessionAsync(cancellationToken: ct).ConfigureAwait(false); + _serverHandle = await _session.RegisterAsync(options.ClientName, ct).ConfigureAwait(false); + } + + /// + public async Task DisconnectAsync(CancellationToken ct = default) + { + if (_session is not null) + await _session.CloseAsync(ct).ConfigureAwait(false); + } + + /// + public async Task SubscribeAsync(string tagPath, CancellationToken ct = default) + { + var handle = await GetOrAddItemHandleAsync(tagPath, ct).ConfigureAwait(false); + await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false); + return handle.ToString(CultureInfo.InvariantCulture); + } + + /// + public async Task UnsubscribeAsync(string subscriptionId, CancellationToken ct = default) + { + if (!int.TryParse(subscriptionId, NumberStyles.Integer, CultureInfo.InvariantCulture, out var handle)) + return; + + await _session!.UnAdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false); + await _session.RemoveItemAsync(_serverHandle, handle, ct).ConfigureAwait(false); + + if (_handleToTag.TryRemove(handle, out var tag)) + _tagToHandle.TryRemove(tag, out _); + } + + /// + public async Task> ReadAsync(IReadOnlyList tagPaths, CancellationToken ct = default) + { + var results = await _session! + .ReadBulkAsync(_serverHandle, tagPaths, TimeSpan.FromMilliseconds(_readTimeoutMs), ct) + .ConfigureAwait(false); + + return results.Select(r => new MxReadOutcome( + r.TagAddress, + r.WasSuccessful, + r.WasSuccessful ? r.Value?.ToClrValue() : null, + MapQuality(r.Quality, r.Statuses), + r.SourceTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow, + r.WasSuccessful ? null : r.ErrorMessage)).ToList(); + } + + /// + public async Task> WriteAsync(IReadOnlyList<(string TagPath, object? Value)> writes, CancellationToken ct = default) + { + // Build entries in request order; remember the tag for each handle so the + // per-handle BulkWriteResult can be mapped back to its tag. + var entries = new List(writes.Count); + var orderedTags = new List(writes.Count); + foreach (var (tag, value) in writes) + { + var handle = await GetOrAddItemHandleAsync(tag, ct).ConfigureAwait(false); + entries.Add(new WriteBulkEntry + { + ItemHandle = handle, + Value = ToMxValue(value), + UserId = _writeUserId, + }); + orderedTags.Add(tag); + } + + var results = await _session!.WriteBulkAsync(_serverHandle, entries, ct).ConfigureAwait(false); + + // Results are returned in request order; pair by index back to the tags. + return results.Select((r, i) => new MxWriteOutcome( + i < orderedTags.Count ? orderedTags[i] : (_handleToTag.TryGetValue(r.ItemHandle, out var t) ? t : ""), + r.WasSuccessful, + r.WasSuccessful ? null : r.ErrorMessage)).ToList(); + } + + /// + public async Task<(IReadOnlyList Children, bool Truncated)> BrowseChildrenAsync(string? parentNodeId, CancellationToken ct = default) + { + var request = new BrowseChildrenRequest { IncludeAttributes = true }; + // Object NodeIds are the Galaxy gobject id (encoded as a string); attribute + // NodeIds are FullTagReference leaves and never arrive here as a parent. + if (!string.IsNullOrEmpty(parentNodeId) + && int.TryParse(parentNodeId, NumberStyles.Integer, CultureInfo.InvariantCulture, out var gobjectId)) + { + request.ParentGobjectId = gobjectId; + } + + BrowseChildrenReply reply; + try + { + reply = await _galaxy!.BrowseChildrenRawAsync(request, ct).ConfigureAwait(false); + } + catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) + { + throw new ConnectionNotConnectedException($"MxGateway repository unavailable: {ex.Status.Detail}"); + } + + var children = new List(); + for (var i = 0; i < reply.Children.Count; i++) + { + var obj = reply.Children[i]; + var hasChildren = i < reply.ChildHasChildren.Count && reply.ChildHasChildren[i]; + // Navigable container node, keyed by gobject id. + children.Add(new MxBrowseChild( + obj.GobjectId.ToString(CultureInfo.InvariantCulture), + string.IsNullOrEmpty(obj.TagName) ? obj.ContainedName : obj.TagName, + BrowseNodeClass.Object, + hasChildren || obj.Attributes.Count > 0)); + + // Selectable attribute leaves, keyed by their full tag reference. + foreach (var attr in obj.Attributes) + { + children.Add(new MxBrowseChild( + attr.FullTagReference, + attr.AttributeName, + BrowseNodeClass.Variable, + false)); + } + } + + return (children, !string.IsNullOrEmpty(reply.NextPageToken)); + } + + /// + public async Task RunEventLoopAsync(Action onUpdate, CancellationToken ct = default) + { + await foreach (var ev in _session!.StreamEventsAsync(_lastSeq, ct).ConfigureAwait(false)) + { + _lastSeq = ev.WorkerSequence; + if (ev.Family != MxEventFamily.OnDataChange) + continue; + if (!_handleToTag.TryGetValue(ev.ItemHandle, out var tag)) + continue; + + onUpdate(new MxValueUpdate( + tag, + ev.Value?.ToClrValue(), + MapQuality(ev.Quality, ev.Statuses), + ev.SourceTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow)); + } + } + + /// + public async ValueTask DisposeAsync() + { + if (_session is not null) await _session.DisposeAsync().ConfigureAwait(false); + if (_client is not null) await _client.DisposeAsync().ConfigureAwait(false); + if (_galaxy is not null) await _galaxy.DisposeAsync().ConfigureAwait(false); + } + + private async Task GetOrAddItemHandleAsync(string tagPath, CancellationToken ct) + { + if (_tagToHandle.TryGetValue(tagPath, out var existing)) + return existing; + + var handle = await _session!.AddItemAsync(_serverHandle, tagPath, ct).ConfigureAwait(false); + _tagToHandle[tagPath] = handle; + _handleToTag[handle] = tagPath; + return handle; + } + + /// + /// Maps MXAccess quality. A failing status proxy is authoritative bad; otherwise + /// the OPC-style quality byte: ≥192 Good, ≥64 Uncertain, else Bad. + /// + private static QualityCode MapQuality(int quality, IEnumerable statuses) + { + if (statuses.Any(s => !s.IsSuccess())) + return QualityCode.Bad; + return quality switch + { + >= 192 => QualityCode.Good, + >= 64 => QualityCode.Uncertain, + _ => QualityCode.Bad, + }; + } + + private static MxValue ToMxValue(object? value) => value switch + { + null => new MxValue { IsNull = true }, + bool b => b.ToMxValue(), + int i => i.ToMxValue(), + long l => l.ToMxValue(), + float f => f.ToMxValue(), + double d => d.ToMxValue(), + string s => s.ToMxValue(), + DateTimeOffset dto => dto.ToMxValue(), + DateTime dt => dt.ToMxValue(), + // Fall back to invariant string for any other CLR type. + _ => Convert.ToString(value, CultureInfo.InvariantCulture)!.ToMxValue(), + }; +} + +/// Builds instances. +public sealed class RealMxGatewayClientFactory : IMxGatewayClientFactory +{ + private readonly ILoggerFactory? _loggerFactory; + + /// Initializes a new factory. + /// Logger factory passed to each created client. + public RealMxGatewayClientFactory(ILoggerFactory? loggerFactory) => _loggerFactory = loggerFactory; + + /// + public IMxGatewayClient Create() => new RealMxGatewayClient(_loggerFactory); +}