Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs
Joseph Doherty c14624f012 Phase 2 PR 14 — alarm subsystem wire-up. Per IsAlarm=true attribute (PR 9 added the discovery flag), GalaxyAlarmTracker in Backend/Alarms/ advises the four Galaxy alarm-state attributes: .InAlarm (boolean alarm active), .Priority (int severity), .DescAttrName (human-readable description), .Acked (boolean acknowledged). Runs the OPC UA Part 9 alarm lifecycle state machine simplified for the Galaxy AlarmExtension model and raises AlarmTransition events on transitions operators must react to — Active (InAlarm false→true, default Unacknowledged), Acknowledged (Acked false→true while InAlarm still true), Inactive (InAlarm true→false). MxAccessGalaxyBackend instantiates the tracker in its constructor with delegate-based subscribe/unsubscribe/write pointers to MxAccessClient, hooks TransitionRaised to forward each transition through the existing OnAlarmEvent IPC event that PR 4 ConnectionSink wires into MessageKind.AlarmEvent frames — no new contract messages required since GalaxyAlarmEvent already exists in Shared.Contracts. Field mapping: EventId = fresh Guid.ToString('N') per transition, ObjectTagName = alarm attribute full reference, AlarmName = alarm attribute full reference, Severity = tracked Priority, StateTransition = 'Active'|'Acknowledged'|'Inactive', Message = DescAttrName or tag fallback, UtcUnixMs = transition time. DiscoverAsync caches every IsAlarm=true attribute's full reference (tag.attribute) into _discoveredAlarmTags (ConcurrentBag cleared-then-filled on every re-Discover to track Galaxy redeploys). SubscribeAlarmsAsync iterates the cache and advises each via GalaxyAlarmTracker.TrackAsync; best-effort per-alarm — a subscribe failure on one alarm doesn't abort the whole call since operators prefer partial alarm coverage to none. Tracker is internally idempotent on repeat Track calls (second invocation for same alarm tag is a no-op; already-subscribed check short-circuits before the 4 MXAccess sub calls). Subscribe-failure rollback inside TrackAsync removes the alarm state + unadvises any of the 4 that did succeed so a partial advise can't leak a phantom tracking entry. AcknowledgeAlarmAsync routes to tracker.AcknowledgeAsync which writes the operator comment to <tag>.AckMsg via MxAccessClient.WriteAsync — writes use the existing MXAccess OnWriteComplete TCS-by-handle path (PR 4 Medium 4) so a runtime-refused ack bubbles up as Success=false rather than false-positive. State-machine quirks preserved from v1: (1) initial Acked=true on subscribe does NOT fire Acknowledged (alarm at rest, pre-acknowledged — default state is Acked=true so the first subscribe callback is a no-op transition), (2) Acked false→true only fires Acknowledged when InAlarm is currently true (acking a latched-inactive alarm is not a user-visible transition), (3) Active transition clears the Acked flag in-state so the next Acked callback correctly fires Acknowledged (v1 had this buried in the ConditionState logic; we track it on the AlarmState struct directly). Priority value handled as int/short/long via type pattern match with int.MaxValue guard — Galaxy attribute category returns varying CLR types (Int32 is canonical but some older templates use Int16), and a long overflow cast to int would silently corrupt the severity. Dispose cascade in MxAccessGalaxyBackend.Dispose: alarm-tracker unsubscribe→dispose, probe-manager unsubscribe→dispose, mx.ConnectionStateChanged detach, historian dispose — same discipline PR 6 / PR 8 / PR 13 established so dangling invocation-list refs don't survive a backend recycle. #pragma warning disable CS0067 around OnAlarmEvent removed since the event is now raised. Tests (9 new, GalaxyAlarmTrackerTests): four-attribute subscribe per alarm, idempotent repeat-track, InAlarm false→true fires Active with Priority + Desc, InAlarm true→false fires Inactive, Acked false→true while InAlarm fires Acknowledged, Acked transition while InAlarm=false does not fire, AckMsg write path carries the comment, snapshot reports latest four fields, foreign probe callback for a non-tracked tag is silently dropped. Full Galaxy.Host.Tests Unit suite 84 pass / 0 fail (9 new alarm + 12 PR 13 probe + 21 PR 12 quality + 42 pre-existing). Galaxy.Host builds clean (0/0). Branches off phase-2-pr13-runtime-probe so the MxAccessGalaxyBackend constructor/Dispose chain gets the probe-manager + alarm-tracker wire-up in a coherent order; fast-forwards if PR 13 merges first.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:34:13 -04:00

609 lines
27 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// <summary>
/// Production <see cref="IGalaxyBackend"/> — combines the SQL-backed
/// <see cref="GalaxyRepository"/> for Discover with the live MXAccess
/// <see cref="MxAccessClient"/> for Read / Write / Subscribe. History stays bad-coded
/// until the Wonderware Historian SDK plugin loader (Task B.1.h) lands. Alarms come from
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
/// (the v1 alarm subsystem is its own subtree).
/// </summary>
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
{
private readonly GalaxyRepository _repository;
private readonly MxAccessClient _mx;
private readonly IHistorianDataSource? _historian;
private long _nextSessionId;
private long _nextSubscriptionId;
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
private readonly System.EventHandler<bool> _onConnectionStateChanged;
private readonly GalaxyRuntimeProbeManager _probeManager;
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
private readonly GalaxyAlarmTracker _alarmTracker;
private readonly System.EventHandler<AlarmTransition> _onAlarmTransition;
// Cached during DiscoverAsync so SubscribeAlarmsAsync knows which attributes to advise.
// One entry per IsAlarm=true attribute in the last discovered hierarchy.
private readonly System.Collections.Concurrent.ConcurrentBag<string> _discoveredAlarmTags = new();
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
_repository = repository;
_mx = mx;
_historian = historian;
// PR 8: gateway-level host-status push. When the MXAccess COM proxy transitions
// connected↔disconnected, raise OnHostStatusChanged with a synthetic host entry named
// after the Wonderware client identity so the Admin UI surfaces top-level transport
// health even before per-platform/per-engine probing lands (deferred to a later PR that
// ports v1's GalaxyRuntimeProbeManager with ScanState subscriptions).
_onConnectionStateChanged = (_, connected) =>
{
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
{
HostName = _mx.ClientName,
RuntimeStatus = connected ? "Running" : "Stopped",
LastObservedUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
});
};
_mx.ConnectionStateChanged += _onConnectionStateChanged;
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
// which runs the state machine and raises StateChanged on transitions we care about.
// We forward each transition through the same OnHostStatusChanged IPC event that the
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
// Admin UI can show per-host health independently from the top-level transport status.
_probeManager = new GalaxyRuntimeProbeManager(
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
_onProbeStateChanged = (_, t) =>
{
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
{
HostName = t.TagName,
RuntimeStatus = t.NewState switch
{
HostRuntimeState.Running => "Running",
HostRuntimeState.Stopped => "Stopped",
_ => "Unknown",
},
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
});
};
_probeManager.StateChanged += _onProbeStateChanged;
// PR 14: alarm subsystem. Per IsAlarm=true attribute discovered, subscribe to the four
// alarm-state attributes (.InAlarm/.Priority/.DescAttrName/.Acked), track lifecycle,
// and raise GalaxyAlarmEvent on transitions — forwarded through the existing
// OnAlarmEvent IPC event that the PR 4 ConnectionSink already wires into AlarmEvent frames.
_alarmTracker = new GalaxyAlarmTracker(
subscribe: (tag, cb) => _mx.SubscribeAsync(tag, cb),
unsubscribe: tag => _mx.UnsubscribeAsync(tag),
write: (tag, v) => _mx.WriteAsync(tag, v));
_onAlarmTransition = (_, t) => OnAlarmEvent?.Invoke(this, new GalaxyAlarmEvent
{
EventId = Guid.NewGuid().ToString("N"),
ObjectTagName = t.AlarmTag,
AlarmName = t.AlarmTag,
Severity = t.Priority,
StateTransition = t.Transition switch
{
AlarmStateTransition.Active => "Active",
AlarmStateTransition.Acknowledged => "Acknowledged",
AlarmStateTransition.Inactive => "Inactive",
_ => "Unknown",
},
Message = t.DescAttrName ?? t.AlarmTag,
UtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
});
_alarmTracker.TransitionRaised += _onAlarmTransition;
}
/// <summary>
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
/// advise ScanState per host.
/// </summary>
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
try
{
await _mx.ConnectAsync();
return new OpenSessionResponse { Success = true, SessionId = Interlocked.Increment(ref _nextSessionId) };
}
catch (Exception ex)
{
return new OpenSessionResponse { Success = false, Error = $"MXAccess connect failed: {ex.Message}" };
}
}
public async Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct)
{
await _mx.DisconnectAsync();
}
public async Task<DiscoverHierarchyResponse> DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct)
{
try
{
var hierarchy = await _repository.GetHierarchyAsync(ct).ConfigureAwait(false);
var attributes = await _repository.GetAttributesAsync(ct).ConfigureAwait(false);
var attrsByGobject = attributes
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.Select(MapAttribute).ToArray());
var nameByGobject = hierarchy.ToDictionary(o => o.GobjectId, o => o.TagName);
var objects = hierarchy.Select(o => new GalaxyObjectInfo
{
ContainedName = string.IsNullOrEmpty(o.ContainedName) ? o.TagName : o.ContainedName,
TagName = o.TagName,
ParentContainedName = o.ParentGobjectId != 0 && nameByGobject.TryGetValue(o.ParentGobjectId, out var p) ? p : null,
TemplateCategory = MapCategory(o.CategoryId),
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
}).ToArray();
// PR 14: cache alarm-bearing attribute full refs so SubscribeAlarmsAsync can advise
// them on demand. Format matches the Galaxy reference grammar <tag>.<attr>.
var freshAlarmTags = attributes
.Where(a => a.IsAlarm)
.Select(a => nameByGobject.TryGetValue(a.GobjectId, out var tn)
? tn + "." + a.AttributeName
: null)
.Where(s => !string.IsNullOrWhiteSpace(s))
.Cast<string>()
.ToArray();
while (_discoveredAlarmTags.TryTake(out _)) { }
foreach (var t in freshAlarmTags) _discoveredAlarmTags.Add(t);
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
// so ScanState subscriptions track the current runtime set. Best-effort — probe
// failures don't block Discover from returning, since the gateway-level signal from
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
// that level if any per-host probe couldn't advise.
try
{
var targets = hierarchy
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
}
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
}
catch (Exception ex)
{
return new DiscoverHierarchyResponse { Success = false, Error = ex.Message, Objects = Array.Empty<GalaxyObjectInfo>() };
}
}
public async Task<ReadValuesResponse> ReadValuesAsync(ReadValuesRequest req, CancellationToken ct)
{
if (!_mx.IsConnected) return new ReadValuesResponse { Success = false, Error = "Not connected", Values = Array.Empty<GalaxyDataValue>() };
var results = new List<GalaxyDataValue>(req.TagReferences.Length);
foreach (var reference in req.TagReferences)
{
try
{
var vtq = await _mx.ReadAsync(reference, TimeSpan.FromSeconds(5), ct);
results.Add(ToWire(reference, vtq));
}
catch (Exception ex)
{
results.Add(new GalaxyDataValue
{
TagReference = reference,
StatusCode = 0x80020000u, // Bad_InternalError
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
ValueBytes = MessagePackSerializer.Serialize(ex.Message),
});
}
}
return new ReadValuesResponse { Success = true, Values = results.ToArray() };
}
public async Task<WriteValuesResponse> WriteValuesAsync(WriteValuesRequest req, CancellationToken ct)
{
var results = new List<WriteValueResult>(req.Writes.Length);
foreach (var w in req.Writes)
{
try
{
// Decode the value back from the MessagePack bytes the Proxy sent.
var value = w.ValueBytes is null
? null
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
var ok = await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult
{
TagReference = w.TagReference,
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
Error = ok ? null : "MXAccess runtime reported write failure",
});
}
catch (Exception ex)
{
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0x80020000u, Error = ex.Message });
}
}
return new WriteValuesResponse { Results = results.ToArray() };
}
public async Task<SubscribeResponse> SubscribeAsync(SubscribeRequest req, CancellationToken ct)
{
var sid = Interlocked.Increment(ref _nextSubscriptionId);
try
{
foreach (var tag in req.TagReferences)
{
_refToSubs.AddOrUpdate(tag,
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
(_, bag) => { bag.Add(sid); return bag; });
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
// fires for every active subscription of that tag — we fan out by SubscriptionId.
await _mx.SubscribeAsync(tag, OnTagValueChanged);
}
_subs[sid] = req.TagReferences;
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
}
catch (Exception ex)
{
return new SubscribeResponse { Success = false, Error = ex.Message };
}
}
public async Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct)
{
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
foreach (var r in refs)
{
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
// other subscription is still listening (multiple Proxy subs may share a tag).
_refToSubs.TryGetValue(r, out var bag);
if (bag is not null)
{
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
bag.Where(id => id != req.SubscriptionId));
if (remaining.IsEmpty)
{
_refToSubs.TryRemove(r, out _);
await _mx.UnsubscribeAsync(r);
}
else
{
_refToSubs[r] = remaining;
}
}
}
}
/// <summary>
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
/// subscription that includes this tag — the IPC sink translates that into outbound
/// <c>OnDataChangeNotification</c> frames.
/// </summary>
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
{
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
var wireValue = ToWire(fullReference, vtq);
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
// each ISubscribable consumer based on the SubscriptionId in the payload.
foreach (var sid in bag.Distinct())
{
OnDataChange?.Invoke(this, new OnDataChangeNotification
{
SubscriptionId = sid,
Values = new[] { wireValue },
});
}
}
/// <summary>
/// PR 14: advise every alarm-bearing attribute's 4-attr quartet. Best-effort per-alarm —
/// a subscribe failure on one alarm doesn't abort the whole call, since operators prefer
/// partial alarm coverage to none. Idempotent on repeat calls (tracker internally
/// skips already-tracked alarms).
/// </summary>
public async Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct)
{
foreach (var tag in _discoveredAlarmTags)
{
try { await _alarmTracker.TrackAsync(tag).ConfigureAwait(false); }
catch { /* swallow per-alarm — tracker rolls back its own state on failure */ }
}
}
/// <summary>
/// PR 14: route operator ack through the tracker's AckMsg write path. EventId on the
/// incoming request maps directly to the alarm full reference (Proxy-side naming
/// convention from GalaxyProxyDriver.RaiseAlarmEvent → ev.EventId).
/// </summary>
public async Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct)
{
// EventId carries a per-transition Guid.ToString("N"); there's no reverse map from
// event id to alarm tag yet, so v1's convention (ack targets the condition) is matched
// by reading the alarm name from the Comment envelope: v1 packed "<tag>|<comment>".
// Until the Proxy is updated to send the alarm tag separately, fall back to treating
// the EventId as the alarm tag — Client CLI passes it through unchanged.
var tag = req.EventId;
if (!string.IsNullOrWhiteSpace(tag))
{
try { await _alarmTracker.AcknowledgeAsync(tag, req.Comment ?? string.Empty).ConfigureAwait(false); }
catch { /* swallow — ack failures surface via MxAccessClient.WriteAsync logs */ }
}
}
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Tags = Array.Empty<HistoryTagValues>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
try
{
foreach (var reference in req.TagReferences)
{
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
tags.Add(new HistoryTagValues
{
TagReference = reference,
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
});
}
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadResponse
{
Success = false,
Error = $"Historian read failed: {ex.Message}",
Tags = tags.ToArray(),
};
}
}
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.IntervalMs <= 0)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "HistoryReadProcessed requires IntervalMs > 0",
Values = Array.Empty<GalaxyDataValue>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var samples = await _historian.ReadAggregateAsync(
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadProcessedResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadProcessedResponse
{
Success = false,
Error = $"Historian aggregate read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadAtTimeResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.TimestampsUtcUnixMs.Length == 0)
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
var timestamps = req.TimestampsUtcUnixMs
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
.ToArray();
try
{
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadAtTimeResponse
{
Success = false,
Error = $"Historian at-time read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadEventsResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
var wire = events.Select(e => new GalaxyHistoricalEvent
{
EventId = e.Id.ToString(),
SourceName = e.Source,
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
DisplayText = e.DisplayText,
Severity = e.Severity,
}).ToArray();
return new HistoryReadEventsResponse { Success = true, Events = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadEventsResponse
{
Success = false,
Error = $"Historian event read failed: {ex.Message}",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
public void Dispose()
{
_alarmTracker.TransitionRaised -= _onAlarmTransition;
_alarmTracker.Dispose();
_probeManager.StateChanged -= _onProbeStateChanged;
_probeManager.Dispose();
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
_historian?.Dispose();
}
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
{
TagReference = reference,
ValueBytes = vtq.Value is null ? null : MessagePackSerializer.Serialize(vtq.Value),
ValueMessagePackType = 0,
StatusCode = vtq.Quality >= 192 ? 0u : 0x40000000u, // Good vs Uncertain placeholder
SourceTimestampUtcUnixMs = new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
/// <summary>
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
/// the hop intact.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
ValueMessagePackType = 0,
StatusCode = HistorianQualityMapper.Map(sample.Quality),
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
/// <summary>
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
ValueMessagePackType = 0,
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{
AttributeName = row.AttributeName,
MxDataType = row.MxDataType,
IsArray = row.IsArray,
ArrayDim = row.ArrayDimension is int d and > 0 ? (uint)d : null,
SecurityClassification = row.SecurityClassification,
IsHistorized = row.IsHistorized,
IsAlarm = row.IsAlarm,
};
private static string MapCategory(int categoryId) => categoryId switch
{
1 => "$WinPlatform",
3 => "$AppEngine",
4 => "$Area",
10 => "$UserDefined",
11 => "$ApplicationObject",
13 => "$Area",
17 => "$DeviceIntegration",
24 => "$ViewEngine",
26 => "$ViewApp",
_ => $"category-{categoryId}",
};
}