531 lines
23 KiB
C#
531 lines
23 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using MessagePack;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
|
|
|
/// <summary>
|
|
/// Production <see cref="IGalaxyBackend"/> — combines the SQL-backed
|
|
/// <see cref="GalaxyRepository"/> for Discover with the live MXAccess
|
|
/// <see cref="MxAccessClient"/> for Read / Write / Subscribe. History stays bad-coded
|
|
/// until the Wonderware Historian SDK plugin loader (Task B.1.h) lands. Alarms come from
|
|
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
|
|
/// (the v1 alarm subsystem is its own subtree).
|
|
/// </summary>
|
|
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|
{
|
|
private readonly GalaxyRepository _repository;
|
|
private readonly MxAccessClient _mx;
|
|
private readonly IHistorianDataSource? _historian;
|
|
private long _nextSessionId;
|
|
private long _nextSubscriptionId;
|
|
|
|
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
|
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
|
|
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
|
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
|
|
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
|
|
|
|
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
|
#pragma warning disable CS0067 // alarm wire-up deferred to PR 9
|
|
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
|
#pragma warning restore CS0067
|
|
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
|
|
|
private readonly System.EventHandler<bool> _onConnectionStateChanged;
|
|
private readonly GalaxyRuntimeProbeManager _probeManager;
|
|
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
|
|
|
|
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
|
{
|
|
_repository = repository;
|
|
_mx = mx;
|
|
_historian = historian;
|
|
|
|
// PR 8: gateway-level host-status push. When the MXAccess COM proxy transitions
|
|
// connected↔disconnected, raise OnHostStatusChanged with a synthetic host entry named
|
|
// after the Wonderware client identity so the Admin UI surfaces top-level transport
|
|
// health even before per-platform/per-engine probing lands (deferred to a later PR that
|
|
// ports v1's GalaxyRuntimeProbeManager with ScanState subscriptions).
|
|
_onConnectionStateChanged = (_, connected) =>
|
|
{
|
|
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
|
|
{
|
|
HostName = _mx.ClientName,
|
|
RuntimeStatus = connected ? "Running" : "Stopped",
|
|
LastObservedUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
|
});
|
|
};
|
|
_mx.ConnectionStateChanged += _onConnectionStateChanged;
|
|
|
|
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
|
|
// which runs the state machine and raises StateChanged on transitions we care about.
|
|
// We forward each transition through the same OnHostStatusChanged IPC event that the
|
|
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
|
|
// Admin UI can show per-host health independently from the top-level transport status.
|
|
_probeManager = new GalaxyRuntimeProbeManager(
|
|
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
|
|
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
|
|
_onProbeStateChanged = (_, t) =>
|
|
{
|
|
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
|
|
{
|
|
HostName = t.TagName,
|
|
RuntimeStatus = t.NewState switch
|
|
{
|
|
HostRuntimeState.Running => "Running",
|
|
HostRuntimeState.Stopped => "Stopped",
|
|
_ => "Unknown",
|
|
},
|
|
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
});
|
|
};
|
|
_probeManager.StateChanged += _onProbeStateChanged;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
|
|
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
|
|
/// advise ScanState per host.
|
|
/// </summary>
|
|
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
|
|
|
|
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await _mx.ConnectAsync();
|
|
return new OpenSessionResponse { Success = true, SessionId = Interlocked.Increment(ref _nextSessionId) };
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return new OpenSessionResponse { Success = false, Error = $"MXAccess connect failed: {ex.Message}" };
|
|
}
|
|
}
|
|
|
|
public async Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct)
|
|
{
|
|
await _mx.DisconnectAsync();
|
|
}
|
|
|
|
public async Task<DiscoverHierarchyResponse> DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
var hierarchy = await _repository.GetHierarchyAsync(ct).ConfigureAwait(false);
|
|
var attributes = await _repository.GetAttributesAsync(ct).ConfigureAwait(false);
|
|
|
|
var attrsByGobject = attributes
|
|
.GroupBy(a => a.GobjectId)
|
|
.ToDictionary(g => g.Key, g => g.Select(MapAttribute).ToArray());
|
|
var nameByGobject = hierarchy.ToDictionary(o => o.GobjectId, o => o.TagName);
|
|
|
|
var objects = hierarchy.Select(o => new GalaxyObjectInfo
|
|
{
|
|
ContainedName = string.IsNullOrEmpty(o.ContainedName) ? o.TagName : o.ContainedName,
|
|
TagName = o.TagName,
|
|
ParentContainedName = o.ParentGobjectId != 0 && nameByGobject.TryGetValue(o.ParentGobjectId, out var p) ? p : null,
|
|
TemplateCategory = MapCategory(o.CategoryId),
|
|
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
|
|
}).ToArray();
|
|
|
|
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
|
|
// so ScanState subscriptions track the current runtime set. Best-effort — probe
|
|
// failures don't block Discover from returning, since the gateway-level signal from
|
|
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
|
|
// that level if any per-host probe couldn't advise.
|
|
try
|
|
{
|
|
var targets = hierarchy
|
|
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
|
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
|
|
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
|
|
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
|
|
}
|
|
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
|
|
|
|
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return new DiscoverHierarchyResponse { Success = false, Error = ex.Message, Objects = Array.Empty<GalaxyObjectInfo>() };
|
|
}
|
|
}
|
|
|
|
public async Task<ReadValuesResponse> ReadValuesAsync(ReadValuesRequest req, CancellationToken ct)
|
|
{
|
|
if (!_mx.IsConnected) return new ReadValuesResponse { Success = false, Error = "Not connected", Values = Array.Empty<GalaxyDataValue>() };
|
|
|
|
var results = new List<GalaxyDataValue>(req.TagReferences.Length);
|
|
foreach (var reference in req.TagReferences)
|
|
{
|
|
try
|
|
{
|
|
var vtq = await _mx.ReadAsync(reference, TimeSpan.FromSeconds(5), ct);
|
|
results.Add(ToWire(reference, vtq));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
results.Add(new GalaxyDataValue
|
|
{
|
|
TagReference = reference,
|
|
StatusCode = 0x80020000u, // Bad_InternalError
|
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
|
ValueBytes = MessagePackSerializer.Serialize(ex.Message),
|
|
});
|
|
}
|
|
}
|
|
|
|
return new ReadValuesResponse { Success = true, Values = results.ToArray() };
|
|
}
|
|
|
|
public async Task<WriteValuesResponse> WriteValuesAsync(WriteValuesRequest req, CancellationToken ct)
|
|
{
|
|
var results = new List<WriteValueResult>(req.Writes.Length);
|
|
foreach (var w in req.Writes)
|
|
{
|
|
try
|
|
{
|
|
// Decode the value back from the MessagePack bytes the Proxy sent.
|
|
var value = w.ValueBytes is null
|
|
? null
|
|
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
|
|
|
|
var ok = await _mx.WriteAsync(w.TagReference, value!);
|
|
results.Add(new WriteValueResult
|
|
{
|
|
TagReference = w.TagReference,
|
|
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
|
|
Error = ok ? null : "MXAccess runtime reported write failure",
|
|
});
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0x80020000u, Error = ex.Message });
|
|
}
|
|
}
|
|
return new WriteValuesResponse { Results = results.ToArray() };
|
|
}
|
|
|
|
public async Task<SubscribeResponse> SubscribeAsync(SubscribeRequest req, CancellationToken ct)
|
|
{
|
|
var sid = Interlocked.Increment(ref _nextSubscriptionId);
|
|
|
|
try
|
|
{
|
|
foreach (var tag in req.TagReferences)
|
|
{
|
|
_refToSubs.AddOrUpdate(tag,
|
|
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
|
|
(_, bag) => { bag.Add(sid); return bag; });
|
|
|
|
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
|
|
// fires for every active subscription of that tag — we fan out by SubscriptionId.
|
|
await _mx.SubscribeAsync(tag, OnTagValueChanged);
|
|
}
|
|
|
|
_subs[sid] = req.TagReferences;
|
|
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return new SubscribeResponse { Success = false, Error = ex.Message };
|
|
}
|
|
}
|
|
|
|
public async Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct)
|
|
{
|
|
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
|
|
foreach (var r in refs)
|
|
{
|
|
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
|
|
// other subscription is still listening (multiple Proxy subs may share a tag).
|
|
_refToSubs.TryGetValue(r, out var bag);
|
|
if (bag is not null)
|
|
{
|
|
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
|
|
bag.Where(id => id != req.SubscriptionId));
|
|
if (remaining.IsEmpty)
|
|
{
|
|
_refToSubs.TryRemove(r, out _);
|
|
await _mx.UnsubscribeAsync(r);
|
|
}
|
|
else
|
|
{
|
|
_refToSubs[r] = remaining;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
|
|
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
|
|
/// subscription that includes this tag — the IPC sink translates that into outbound
|
|
/// <c>OnDataChangeNotification</c> frames.
|
|
/// </summary>
|
|
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
|
|
{
|
|
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
|
|
|
|
var wireValue = ToWire(fullReference, vtq);
|
|
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
|
|
// each ISubscribable consumer based on the SubscriptionId in the payload.
|
|
foreach (var sid in bag.Distinct())
|
|
{
|
|
OnDataChange?.Invoke(this, new OnDataChangeNotification
|
|
{
|
|
SubscriptionId = sid,
|
|
Values = new[] { wireValue },
|
|
});
|
|
}
|
|
}
|
|
|
|
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
|
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
|
|
|
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
|
{
|
|
if (_historian is null)
|
|
return new HistoryReadResponse
|
|
{
|
|
Success = false,
|
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
|
Tags = Array.Empty<HistoryTagValues>(),
|
|
};
|
|
|
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
|
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
|
|
|
|
try
|
|
{
|
|
foreach (var reference in req.TagReferences)
|
|
{
|
|
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
|
|
tags.Add(new HistoryTagValues
|
|
{
|
|
TagReference = reference,
|
|
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
|
|
});
|
|
}
|
|
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
|
|
}
|
|
catch (OperationCanceledException) { throw; }
|
|
catch (Exception ex)
|
|
{
|
|
return new HistoryReadResponse
|
|
{
|
|
Success = false,
|
|
Error = $"Historian read failed: {ex.Message}",
|
|
Tags = tags.ToArray(),
|
|
};
|
|
}
|
|
}
|
|
|
|
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
|
|
HistoryReadProcessedRequest req, CancellationToken ct)
|
|
{
|
|
if (_historian is null)
|
|
return new HistoryReadProcessedResponse
|
|
{
|
|
Success = false,
|
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
|
Values = Array.Empty<GalaxyDataValue>(),
|
|
};
|
|
|
|
if (req.IntervalMs <= 0)
|
|
return new HistoryReadProcessedResponse
|
|
{
|
|
Success = false,
|
|
Error = "HistoryReadProcessed requires IntervalMs > 0",
|
|
Values = Array.Empty<GalaxyDataValue>(),
|
|
};
|
|
|
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
|
|
|
try
|
|
{
|
|
var samples = await _historian.ReadAggregateAsync(
|
|
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
|
|
|
|
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
|
|
return new HistoryReadProcessedResponse { Success = true, Values = wire };
|
|
}
|
|
catch (OperationCanceledException) { throw; }
|
|
catch (Exception ex)
|
|
{
|
|
return new HistoryReadProcessedResponse
|
|
{
|
|
Success = false,
|
|
Error = $"Historian aggregate read failed: {ex.Message}",
|
|
Values = Array.Empty<GalaxyDataValue>(),
|
|
};
|
|
}
|
|
}
|
|
|
|
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
|
|
HistoryReadAtTimeRequest req, CancellationToken ct)
|
|
{
|
|
if (_historian is null)
|
|
return new HistoryReadAtTimeResponse
|
|
{
|
|
Success = false,
|
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
|
Values = Array.Empty<GalaxyDataValue>(),
|
|
};
|
|
|
|
if (req.TimestampsUtcUnixMs.Length == 0)
|
|
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
|
|
|
|
var timestamps = req.TimestampsUtcUnixMs
|
|
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
|
|
.ToArray();
|
|
|
|
try
|
|
{
|
|
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
|
|
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
|
|
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
|
|
}
|
|
catch (OperationCanceledException) { throw; }
|
|
catch (Exception ex)
|
|
{
|
|
return new HistoryReadAtTimeResponse
|
|
{
|
|
Success = false,
|
|
Error = $"Historian at-time read failed: {ex.Message}",
|
|
Values = Array.Empty<GalaxyDataValue>(),
|
|
};
|
|
}
|
|
}
|
|
|
|
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
|
HistoryReadEventsRequest req, CancellationToken ct)
|
|
{
|
|
if (_historian is null)
|
|
return new HistoryReadEventsResponse
|
|
{
|
|
Success = false,
|
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
|
};
|
|
|
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
|
|
|
try
|
|
{
|
|
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
|
|
var wire = events.Select(e => new GalaxyHistoricalEvent
|
|
{
|
|
EventId = e.Id.ToString(),
|
|
SourceName = e.Source,
|
|
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
DisplayText = e.DisplayText,
|
|
Severity = e.Severity,
|
|
}).ToArray();
|
|
return new HistoryReadEventsResponse { Success = true, Events = wire };
|
|
}
|
|
catch (OperationCanceledException) { throw; }
|
|
catch (Exception ex)
|
|
{
|
|
return new HistoryReadEventsResponse
|
|
{
|
|
Success = false,
|
|
Error = $"Historian event read failed: {ex.Message}",
|
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
|
};
|
|
}
|
|
}
|
|
|
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
|
|
|
public void Dispose()
|
|
{
|
|
_probeManager.StateChanged -= _onProbeStateChanged;
|
|
_probeManager.Dispose();
|
|
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
|
|
_historian?.Dispose();
|
|
}
|
|
|
|
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
|
|
{
|
|
TagReference = reference,
|
|
ValueBytes = vtq.Value is null ? null : MessagePackSerializer.Serialize(vtq.Value),
|
|
ValueMessagePackType = 0,
|
|
StatusCode = vtq.Quality >= 192 ? 0u : 0x40000000u, // Good vs Uncertain placeholder
|
|
SourceTimestampUtcUnixMs = new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
|
};
|
|
|
|
/// <summary>
|
|
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
|
|
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
|
|
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
|
|
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
|
|
/// the hop intact.
|
|
/// </summary>
|
|
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
|
|
{
|
|
TagReference = reference,
|
|
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
|
|
ValueMessagePackType = 0,
|
|
StatusCode = HistorianQualityMapper.Map(sample.Quality),
|
|
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
|
};
|
|
|
|
|
|
/// <summary>
|
|
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
|
|
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
|
|
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
|
|
/// </summary>
|
|
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
|
|
{
|
|
TagReference = reference,
|
|
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
|
|
ValueMessagePackType = 0,
|
|
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
|
|
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
|
};
|
|
|
|
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
|
|
{
|
|
AttributeName = row.AttributeName,
|
|
MxDataType = row.MxDataType,
|
|
IsArray = row.IsArray,
|
|
ArrayDim = row.ArrayDimension is int d and > 0 ? (uint)d : null,
|
|
SecurityClassification = row.SecurityClassification,
|
|
IsHistorized = row.IsHistorized,
|
|
IsAlarm = row.IsAlarm,
|
|
};
|
|
|
|
private static string MapCategory(int categoryId) => categoryId switch
|
|
{
|
|
1 => "$WinPlatform",
|
|
3 => "$AppEngine",
|
|
4 => "$Area",
|
|
10 => "$UserDefined",
|
|
11 => "$ApplicationObject",
|
|
13 => "$Area",
|
|
17 => "$DeviceIntegration",
|
|
24 => "$ViewEngine",
|
|
26 => "$ViewApp",
|
|
_ => $"category-{categoryId}",
|
|
};
|
|
}
|