using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// /// Production — combines the SQL-backed /// for Discover with the live MXAccess /// for Read / Write / Subscribe. History stays bad-coded /// until the Wonderware Historian SDK plugin loader (Task B.1.h) lands. Alarms come from /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; private readonly IHistorianDataSource? _historian; private long _nextSessionId; private long _nextSubscriptionId; // Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them. private readonly System.Collections.Concurrent.ConcurrentDictionary> _subs = new(); // Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many). private readonly System.Collections.Concurrent.ConcurrentDictionary> _refToSubs = new(System.StringComparer.OrdinalIgnoreCase); public event System.EventHandler? OnDataChange; public event System.EventHandler? OnAlarmEvent; public event System.EventHandler? OnHostStatusChanged; private readonly System.EventHandler _onConnectionStateChanged; private readonly GalaxyRuntimeProbeManager _probeManager; private readonly System.EventHandler _onProbeStateChanged; private readonly GalaxyAlarmTracker _alarmTracker; private readonly System.EventHandler _onAlarmTransition; // Cached during DiscoverAsync so SubscribeAlarmsAsync knows which attributes to advise. // One entry per IsAlarm=true attribute in the last discovered hierarchy. private readonly System.Collections.Concurrent.ConcurrentBag _discoveredAlarmTags = new(); public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { _repository = repository; _mx = mx; _historian = historian; // PR 8: gateway-level host-status push. When the MXAccess COM proxy transitions // connected↔disconnected, raise OnHostStatusChanged with a synthetic host entry named // after the Wonderware client identity so the Admin UI surfaces top-level transport // health even before per-platform/per-engine probing lands (deferred to a later PR that // ports v1's GalaxyRuntimeProbeManager with ScanState subscriptions). _onConnectionStateChanged = (_, connected) => { OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus { HostName = _mx.ClientName, RuntimeStatus = connected ? "Running" : "Stopped", LastObservedUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }); }; _mx.ConnectionStateChanged += _onConnectionStateChanged; // PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback, // which runs the state machine and raises StateChanged on transitions we care about. // We forward each transition through the same OnHostStatusChanged IPC event that the // gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the // Admin UI can show per-host health independently from the top-level transport status. _probeManager = new GalaxyRuntimeProbeManager( subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb), unsubscribe: probe => _mx.UnsubscribeAsync(probe)); _onProbeStateChanged = (_, t) => { OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus { HostName = t.TagName, RuntimeStatus = t.NewState switch { HostRuntimeState.Running => "Running", HostRuntimeState.Stopped => "Stopped", _ => "Unknown", }, LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), }); }; _probeManager.StateChanged += _onProbeStateChanged; // PR 14: alarm subsystem. Per IsAlarm=true attribute discovered, subscribe to the four // alarm-state attributes (.InAlarm/.Priority/.DescAttrName/.Acked), track lifecycle, // and raise GalaxyAlarmEvent on transitions — forwarded through the existing // OnAlarmEvent IPC event that the PR 4 ConnectionSink already wires into AlarmEvent frames. _alarmTracker = new GalaxyAlarmTracker( subscribe: (tag, cb) => _mx.SubscribeAsync(tag, cb), unsubscribe: tag => _mx.UnsubscribeAsync(tag), write: (tag, v) => _mx.WriteAsync(tag, v)); _onAlarmTransition = (_, t) => OnAlarmEvent?.Invoke(this, new GalaxyAlarmEvent { EventId = Guid.NewGuid().ToString("N"), ObjectTagName = t.AlarmTag, AlarmName = t.AlarmTag, Severity = t.Priority, StateTransition = t.Transition switch { AlarmStateTransition.Active => "Active", AlarmStateTransition.Acknowledged => "Acknowledged", AlarmStateTransition.Inactive => "Inactive", _ => "Unknown", }, Message = t.DescAttrName ?? t.AlarmTag, UtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), }); _alarmTracker.TransitionRaised += _onAlarmTransition; } /// /// Exposed for tests. Production flow: DiscoverAsync completes → backend calls /// SyncProbesAsync with the runtime hosts (WinPlatform + AppEngine gobjects) to /// advise ScanState per host. /// internal GalaxyRuntimeProbeManager ProbeManager => _probeManager; public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { try { await _mx.ConnectAsync(); return new OpenSessionResponse { Success = true, SessionId = Interlocked.Increment(ref _nextSessionId) }; } catch (Exception ex) { return new OpenSessionResponse { Success = false, Error = $"MXAccess connect failed: {ex.Message}" }; } } public async Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct) { await _mx.DisconnectAsync(); } public async Task DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct) { try { var hierarchy = await _repository.GetHierarchyAsync(ct).ConfigureAwait(false); var attributes = await _repository.GetAttributesAsync(ct).ConfigureAwait(false); var attrsByGobject = attributes .GroupBy(a => a.GobjectId) .ToDictionary(g => g.Key, g => g.Select(MapAttribute).ToArray()); var nameByGobject = hierarchy.ToDictionary(o => o.GobjectId, o => o.TagName); var objects = hierarchy.Select(o => new GalaxyObjectInfo { ContainedName = string.IsNullOrEmpty(o.ContainedName) ? o.TagName : o.ContainedName, TagName = o.TagName, ParentContainedName = o.ParentGobjectId != 0 && nameByGobject.TryGetValue(o.ParentGobjectId, out var p) ? p : null, TemplateCategory = MapCategory(o.CategoryId), Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty(), }).ToArray(); // PR 14: cache alarm-bearing attribute full refs so SubscribeAlarmsAsync can advise // them on demand. Format matches the Galaxy reference grammar .. var freshAlarmTags = attributes .Where(a => a.IsAlarm) .Select(a => nameByGobject.TryGetValue(a.GobjectId, out var tn) ? tn + "." + a.AttributeName : null) .Where(s => !string.IsNullOrWhiteSpace(s)) .Cast() .ToArray(); while (_discoveredAlarmTags.TryTake(out _)) { } foreach (var t in freshAlarmTags) _discoveredAlarmTags.Add(t); // PR 13: Sync the per-platform probe manager against the just-discovered hierarchy // so ScanState subscriptions track the current runtime set. Best-effort — probe // failures don't block Discover from returning, since the gateway-level signal from // MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to // that level if any per-host probe couldn't advise. try { var targets = hierarchy .Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform || o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine) .Select(o => new HostProbeTarget(o.TagName, o.CategoryId)); await _probeManager.SyncAsync(targets).ConfigureAwait(false); } catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ } return new DiscoverHierarchyResponse { Success = true, Objects = objects }; } catch (Exception ex) { return new DiscoverHierarchyResponse { Success = false, Error = ex.Message, Objects = Array.Empty() }; } } public async Task ReadValuesAsync(ReadValuesRequest req, CancellationToken ct) { if (!_mx.IsConnected) return new ReadValuesResponse { Success = false, Error = "Not connected", Values = Array.Empty() }; var results = new List(req.TagReferences.Length); foreach (var reference in req.TagReferences) { try { var vtq = await _mx.ReadAsync(reference, TimeSpan.FromSeconds(5), ct); results.Add(ToWire(reference, vtq)); } catch (Exception ex) { results.Add(new GalaxyDataValue { TagReference = reference, StatusCode = 0x80020000u, // Bad_InternalError ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), ValueBytes = MessagePackSerializer.Serialize(ex.Message), }); } } return new ReadValuesResponse { Success = true, Values = results.ToArray() }; } public async Task WriteValuesAsync(WriteValuesRequest req, CancellationToken ct) { var results = new List(req.Writes.Length); foreach (var w in req.Writes) { try { // Decode the value back from the MessagePack bytes the Proxy sent. var value = w.ValueBytes is null ? null : MessagePackSerializer.Deserialize(w.ValueBytes); var ok = await _mx.WriteAsync(w.TagReference, value!); results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError Error = ok ? null : "MXAccess runtime reported write failure", }); } catch (Exception ex) { results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0x80020000u, Error = ex.Message }); } } return new WriteValuesResponse { Results = results.ToArray() }; } public async Task SubscribeAsync(SubscribeRequest req, CancellationToken ct) { var sid = Interlocked.Increment(ref _nextSubscriptionId); try { foreach (var tag in req.TagReferences) { _refToSubs.AddOrUpdate(tag, _ => new System.Collections.Concurrent.ConcurrentBag { sid }, (_, bag) => { bag.Add(sid); return bag; }); // The MXAccess SubscribeAsync only takes one callback per tag; the same callback // fires for every active subscription of that tag — we fan out by SubscriptionId. await _mx.SubscribeAsync(tag, OnTagValueChanged); } _subs[sid] = req.TagReferences; return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs }; } catch (Exception ex) { return new SubscribeResponse { Success = false, Error = ex.Message }; } } public async Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct) { if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return; foreach (var r in refs) { // Drop this subscription from the reverse map; only unsubscribe from MXAccess if no // other subscription is still listening (multiple Proxy subs may share a tag). _refToSubs.TryGetValue(r, out var bag); if (bag is not null) { var remaining = new System.Collections.Concurrent.ConcurrentBag( bag.Where(id => id != req.SubscriptionId)); if (remaining.IsEmpty) { _refToSubs.TryRemove(r, out _); await _mx.UnsubscribeAsync(r); } else { _refToSubs[r] = remaining; } } } } /// /// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in /// a and raises once per /// subscription that includes this tag — the IPC sink translates that into outbound /// OnDataChangeNotification frames. /// private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq) { if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return; var wireValue = ToWire(fullReference, vtq); // Emit one notification per active SubscriptionId for this tag — the Proxy fans out to // each ISubscribable consumer based on the SubscriptionId in the payload. foreach (var sid in bag.Distinct()) { OnDataChange?.Invoke(this, new OnDataChangeNotification { SubscriptionId = sid, Values = new[] { wireValue }, }); } } /// /// PR 14: advise every alarm-bearing attribute's 4-attr quartet. Best-effort per-alarm — /// a subscribe failure on one alarm doesn't abort the whole call, since operators prefer /// partial alarm coverage to none. Idempotent on repeat calls (tracker internally /// skips already-tracked alarms). /// public async Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) { foreach (var tag in _discoveredAlarmTags) { try { await _alarmTracker.TrackAsync(tag).ConfigureAwait(false); } catch { /* swallow per-alarm — tracker rolls back its own state on failure */ } } } /// /// PR 14: route operator ack through the tracker's AckMsg write path. EventId on the /// incoming request maps directly to the alarm full reference (Proxy-side naming /// convention from GalaxyProxyDriver.RaiseAlarmEvent → ev.EventId). /// public async Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) { // EventId carries a per-transition Guid.ToString("N"); there's no reverse map from // event id to alarm tag yet, so v1's convention (ack targets the condition) is matched // by reading the alarm name from the Comment envelope: v1 packed "|". // Until the Proxy is updated to send the alarm tag separately, fall back to treating // the EventId as the alarm tag — Client CLI passes it through unchanged. var tag = req.EventId; if (!string.IsNullOrWhiteSpace(tag)) { try { await _alarmTracker.AcknowledgeAsync(tag, req.Comment ?? string.Empty).ConfigureAwait(false); } catch { /* swallow — ack failures surface via MxAccessClient.WriteAsync logs */ } } } public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) { if (_historian is null) return new HistoryReadResponse { Success = false, Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", Tags = Array.Empty(), }; var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; var tags = new List(req.TagReferences.Length); try { foreach (var reference in req.TagReferences) { var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false); tags.Add(new HistoryTagValues { TagReference = reference, Values = samples.Select(s => ToWire(reference, s)).ToArray(), }); } return new HistoryReadResponse { Success = true, Tags = tags.ToArray() }; } catch (OperationCanceledException) { throw; } catch (Exception ex) { return new HistoryReadResponse { Success = false, Error = $"Historian read failed: {ex.Message}", Tags = tags.ToArray(), }; } } public async Task HistoryReadProcessedAsync( HistoryReadProcessedRequest req, CancellationToken ct) { if (_historian is null) return new HistoryReadProcessedResponse { Success = false, Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", Values = Array.Empty(), }; if (req.IntervalMs <= 0) return new HistoryReadProcessedResponse { Success = false, Error = "HistoryReadProcessed requires IntervalMs > 0", Values = Array.Empty(), }; var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; try { var samples = await _historian.ReadAggregateAsync( req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false); var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray(); return new HistoryReadProcessedResponse { Success = true, Values = wire }; } catch (OperationCanceledException) { throw; } catch (Exception ex) { return new HistoryReadProcessedResponse { Success = false, Error = $"Historian aggregate read failed: {ex.Message}", Values = Array.Empty(), }; } } public async Task HistoryReadAtTimeAsync( HistoryReadAtTimeRequest req, CancellationToken ct) { if (_historian is null) return new HistoryReadAtTimeResponse { Success = false, Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", Values = Array.Empty(), }; if (req.TimestampsUtcUnixMs.Length == 0) return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty() }; var timestamps = req.TimestampsUtcUnixMs .Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime) .ToArray(); try { var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false); var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray(); return new HistoryReadAtTimeResponse { Success = true, Values = wire }; } catch (OperationCanceledException) { throw; } catch (Exception ex) { return new HistoryReadAtTimeResponse { Success = false, Error = $"Historian at-time read failed: {ex.Message}", Values = Array.Empty(), }; } } public async Task HistoryReadEventsAsync( HistoryReadEventsRequest req, CancellationToken ct) { if (_historian is null) return new HistoryReadEventsResponse { Success = false, Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", Events = Array.Empty(), }; var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; try { var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false); var wire = events.Select(e => new GalaxyHistoricalEvent { EventId = e.Id.ToString(), SourceName = e.Source, EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(), ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(), DisplayText = e.DisplayText, Severity = e.Severity, }).ToArray(); return new HistoryReadEventsResponse { Success = true, Events = wire }; } catch (OperationCanceledException) { throw; } catch (Exception ex) { return new HistoryReadEventsResponse { Success = false, Error = $"Historian event read failed: {ex.Message}", Events = Array.Empty(), }; } } public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); public void Dispose() { _alarmTracker.TransitionRaised -= _onAlarmTransition; _alarmTracker.Dispose(); _probeManager.StateChanged -= _onProbeStateChanged; _probeManager.Dispose(); _mx.ConnectionStateChanged -= _onConnectionStateChanged; _historian?.Dispose(); } private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, ValueBytes = vtq.Value is null ? null : MessagePackSerializer.Serialize(vtq.Value), ValueMessagePackType = 0, StatusCode = vtq.Quality >= 192 ? 0u : 0x40000000u, // Good vs Uncertain placeholder SourceTimestampUtcUnixMs = new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; /// /// Maps a (raw historian row, OPC-UA-free) to the IPC wire /// shape. The Proxy decodes the MessagePack value and maps /// through QualityMapper on its side of the pipe — we keep the raw byte here so /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive /// the hop intact. /// private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new() { TagReference = reference, ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value), ValueMessagePackType = 0, StatusCode = HistorianQualityMapper.Map(sample.Quality), SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; /// /// Maps a (one aggregate bucket) to the IPC wire /// shape. A null means the aggregate was /// unavailable for the bucket — the Proxy translates that to OPC UA BadNoData. /// private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new() { TagReference = reference, ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value), ValueMessagePackType = 0, StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u, SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, MxDataType = row.MxDataType, IsArray = row.IsArray, ArrayDim = row.ArrayDimension is int d and > 0 ? (uint)d : null, SecurityClassification = row.SecurityClassification, IsHistorized = row.IsHistorized, IsAlarm = row.IsAlarm, }; private static string MapCategory(int categoryId) => categoryId switch { 1 => "$WinPlatform", 3 => "$AppEngine", 4 => "$Area", 10 => "$UserDefined", 11 => "$ApplicationObject", 13 => "$Area", 17 => "$DeviceIntegration", 24 => "$ViewEngine", 26 => "$ViewApp", _ => $"category-{categoryId}", }; }