using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// /// Production — combines the SQL-backed /// for Discover with the live MXAccess /// for Read / Write / Subscribe. History stays bad-coded /// until the Wonderware Historian SDK plugin loader (Task B.1.h) lands. Alarms come from /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// public sealed class MxAccessGalaxyBackend : IGalaxyBackend { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; private long _nextSessionId; private long _nextSubscriptionId; // Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them. private readonly System.Collections.Concurrent.ConcurrentDictionary> _subs = new(); // Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many). private readonly System.Collections.Concurrent.ConcurrentDictionary> _refToSubs = new(System.StringComparer.OrdinalIgnoreCase); public event System.EventHandler? OnDataChange; #pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up public event System.EventHandler? OnAlarmEvent; public event System.EventHandler? OnHostStatusChanged; #pragma warning restore CS0067 public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) { _repository = repository; _mx = mx; } public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { try { await _mx.ConnectAsync(); return new OpenSessionResponse { Success = true, SessionId = Interlocked.Increment(ref _nextSessionId) }; } catch (Exception ex) { return new OpenSessionResponse { Success = false, Error = $"MXAccess connect failed: {ex.Message}" }; } } public async Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct) { await _mx.DisconnectAsync(); } public async Task DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct) { try { var hierarchy = await _repository.GetHierarchyAsync(ct).ConfigureAwait(false); var attributes = await _repository.GetAttributesAsync(ct).ConfigureAwait(false); var attrsByGobject = attributes .GroupBy(a => a.GobjectId) .ToDictionary(g => g.Key, g => g.Select(MapAttribute).ToArray()); var nameByGobject = hierarchy.ToDictionary(o => o.GobjectId, o => o.TagName); var objects = hierarchy.Select(o => new GalaxyObjectInfo { ContainedName = string.IsNullOrEmpty(o.ContainedName) ? o.TagName : o.ContainedName, TagName = o.TagName, ParentContainedName = o.ParentGobjectId != 0 && nameByGobject.TryGetValue(o.ParentGobjectId, out var p) ? p : null, TemplateCategory = MapCategory(o.CategoryId), Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty(), }).ToArray(); return new DiscoverHierarchyResponse { Success = true, Objects = objects }; } catch (Exception ex) { return new DiscoverHierarchyResponse { Success = false, Error = ex.Message, Objects = Array.Empty() }; } } public async Task ReadValuesAsync(ReadValuesRequest req, CancellationToken ct) { if (!_mx.IsConnected) return new ReadValuesResponse { Success = false, Error = "Not connected", Values = Array.Empty() }; var results = new List(req.TagReferences.Length); foreach (var reference in req.TagReferences) { try { var vtq = await _mx.ReadAsync(reference, TimeSpan.FromSeconds(5), ct); results.Add(ToWire(reference, vtq)); } catch (Exception ex) { results.Add(new GalaxyDataValue { TagReference = reference, StatusCode = 0x80020000u, // Bad_InternalError ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), ValueBytes = MessagePackSerializer.Serialize(ex.Message), }); } } return new ReadValuesResponse { Success = true, Values = results.ToArray() }; } public async Task WriteValuesAsync(WriteValuesRequest req, CancellationToken ct) { var results = new List(req.Writes.Length); foreach (var w in req.Writes) { try { // Decode the value back from the MessagePack bytes the Proxy sent. var value = w.ValueBytes is null ? null : MessagePackSerializer.Deserialize(w.ValueBytes); var ok = await _mx.WriteAsync(w.TagReference, value!); results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError Error = ok ? null : "MXAccess runtime reported write failure", }); } catch (Exception ex) { results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0x80020000u, Error = ex.Message }); } } return new WriteValuesResponse { Results = results.ToArray() }; } public async Task SubscribeAsync(SubscribeRequest req, CancellationToken ct) { var sid = Interlocked.Increment(ref _nextSubscriptionId); try { foreach (var tag in req.TagReferences) { _refToSubs.AddOrUpdate(tag, _ => new System.Collections.Concurrent.ConcurrentBag { sid }, (_, bag) => { bag.Add(sid); return bag; }); // The MXAccess SubscribeAsync only takes one callback per tag; the same callback // fires for every active subscription of that tag — we fan out by SubscriptionId. await _mx.SubscribeAsync(tag, OnTagValueChanged); } _subs[sid] = req.TagReferences; return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs }; } catch (Exception ex) { return new SubscribeResponse { Success = false, Error = ex.Message }; } } public async Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct) { if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return; foreach (var r in refs) { // Drop this subscription from the reverse map; only unsubscribe from MXAccess if no // other subscription is still listening (multiple Proxy subs may share a tag). _refToSubs.TryGetValue(r, out var bag); if (bag is not null) { var remaining = new System.Collections.Concurrent.ConcurrentBag( bag.Where(id => id != req.SubscriptionId)); if (remaining.IsEmpty) { _refToSubs.TryRemove(r, out _); await _mx.UnsubscribeAsync(r); } else { _refToSubs[r] = remaining; } } } } /// /// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in /// a and raises once per /// subscription that includes this tag — the IPC sink translates that into outbound /// OnDataChangeNotification frames. /// private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq) { if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return; var wireValue = ToWire(fullReference, vtq); // Emit one notification per active SubscriptionId for this tag — the Proxy fans out to // each ISubscribable consumer based on the SubscriptionId in the payload. foreach (var sid in bag.Distinct()) { OnDataChange?.Invoke(this, new OnDataChangeNotification { SubscriptionId = sid, Values = new[] { wireValue }, }); } } public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) => Task.FromResult(new HistoryReadResponse { Success = false, Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)", Tags = Array.Empty(), }); public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, ValueBytes = vtq.Value is null ? null : MessagePackSerializer.Serialize(vtq.Value), ValueMessagePackType = 0, StatusCode = vtq.Quality >= 192 ? 0u : 0x40000000u, // Good vs Uncertain placeholder SourceTimestampUtcUnixMs = new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, MxDataType = row.MxDataType, IsArray = row.IsArray, ArrayDim = row.ArrayDimension is int d and > 0 ? (uint)d : null, SecurityClassification = row.SecurityClassification, IsHistorized = row.IsHistorized, }; private static string MapCategory(int categoryId) => categoryId switch { 1 => "$WinPlatform", 3 => "$AppEngine", 4 => "$Area", 10 => "$UserDefined", 11 => "$ApplicationObject", 13 => "$Area", 17 => "$DeviceIntegration", 24 => "$ViewEngine", 26 => "$ViewApp", _ => $"category-{categoryId}", }; }