462 lines
18 KiB
C#
462 lines
18 KiB
C#
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
|
||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
|
||
|
||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
|
||
|
||
/// <summary>
|
||
/// <see cref="IDriver"/> implementation that forwards every capability over the Galaxy IPC
|
||
/// channel to the out-of-process Host. Implements the full Phase 2 capability surface;
|
||
/// bodies that depend on the deferred Host-side MXAccess code lift will surface
|
||
/// <see cref="GalaxyIpcException"/> with code <c>not-implemented</c> until the Host's
|
||
/// <c>IGalaxyBackend</c> is wired to the real <c>MxAccessClient</c>.
|
||
/// </summary>
|
||
public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
|
||
: IDriver,
|
||
ITagDiscovery,
|
||
IReadable,
|
||
IWritable,
|
||
ISubscribable,
|
||
IAlarmSource,
|
||
IHistoryProvider,
|
||
IRediscoverable,
|
||
IHostConnectivityProbe,
|
||
IDisposable
|
||
{
|
||
private GalaxyIpcClient? _client;
|
||
private long _sessionId;
|
||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||
|
||
private IReadOnlyList<Core.Abstractions.HostConnectivityStatus> _hostStatuses = [];
|
||
|
||
public string DriverInstanceId => options.DriverInstanceId;
|
||
public string DriverType => "Galaxy";
|
||
|
||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
|
||
public event EventHandler<RediscoveryEventArgs>? OnRediscoveryNeeded;
|
||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||
|
||
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||
{
|
||
_health = new DriverHealth(DriverState.Initializing, null, null);
|
||
try
|
||
{
|
||
_client = await GalaxyIpcClient.ConnectAsync(
|
||
options.PipeName, options.SharedSecret, options.ConnectTimeout, cancellationToken);
|
||
|
||
var resp = await _client.CallAsync<OpenSessionRequest, OpenSessionResponse>(
|
||
MessageKind.OpenSessionRequest,
|
||
new OpenSessionRequest { DriverInstanceId = DriverInstanceId, DriverConfigJson = driverConfigJson },
|
||
MessageKind.OpenSessionResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host OpenSession failed: {resp.Error}");
|
||
|
||
_sessionId = resp.SessionId;
|
||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
|
||
throw;
|
||
}
|
||
}
|
||
|
||
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||
{
|
||
await ShutdownAsync(cancellationToken);
|
||
await InitializeAsync(driverConfigJson, cancellationToken);
|
||
}
|
||
|
||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||
{
|
||
if (_client is null) return;
|
||
|
||
try
|
||
{
|
||
await _client.SendOneWayAsync(
|
||
MessageKind.CloseSessionRequest,
|
||
new CloseSessionRequest { SessionId = _sessionId },
|
||
cancellationToken);
|
||
}
|
||
catch { /* shutdown is best effort */ }
|
||
|
||
await _client.DisposeAsync();
|
||
_client = null;
|
||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||
}
|
||
|
||
public DriverHealth GetHealth() => _health;
|
||
public long GetMemoryFootprint() => 0;
|
||
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||
|
||
// ---- ITagDiscovery ----
|
||
|
||
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||
{
|
||
ArgumentNullException.ThrowIfNull(builder);
|
||
var client = RequireClient();
|
||
|
||
var resp = await client.CallAsync<DiscoverHierarchyRequest, DiscoverHierarchyResponse>(
|
||
MessageKind.DiscoverHierarchyRequest,
|
||
new DiscoverHierarchyRequest { SessionId = _sessionId },
|
||
MessageKind.DiscoverHierarchyResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host DiscoverHierarchy failed: {resp.Error}");
|
||
|
||
foreach (var obj in resp.Objects)
|
||
{
|
||
var folder = builder.Folder(obj.ContainedName, obj.ContainedName);
|
||
foreach (var attr in obj.Attributes)
|
||
{
|
||
folder.Variable(
|
||
attr.AttributeName,
|
||
attr.AttributeName,
|
||
new DriverAttributeInfo(
|
||
FullName: $"{obj.TagName}.{attr.AttributeName}",
|
||
DriverDataType: MapDataType(attr.MxDataType),
|
||
IsArray: attr.IsArray,
|
||
ArrayDim: attr.ArrayDim,
|
||
SecurityClass: MapSecurity(attr.SecurityClassification),
|
||
IsHistorized: attr.IsHistorized,
|
||
IsAlarm: attr.IsAlarm));
|
||
}
|
||
}
|
||
}
|
||
|
||
// ---- IReadable ----
|
||
|
||
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var resp = await client.CallAsync<ReadValuesRequest, ReadValuesResponse>(
|
||
MessageKind.ReadValuesRequest,
|
||
new ReadValuesRequest { SessionId = _sessionId, TagReferences = [.. fullReferences] },
|
||
MessageKind.ReadValuesResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host ReadValues failed: {resp.Error}");
|
||
|
||
var byRef = resp.Values.ToDictionary(v => v.TagReference);
|
||
var result = new DataValueSnapshot[fullReferences.Count];
|
||
for (var i = 0; i < fullReferences.Count; i++)
|
||
{
|
||
result[i] = byRef.TryGetValue(fullReferences[i], out var v)
|
||
? ToSnapshot(v)
|
||
: new DataValueSnapshot(null, StatusBadInternalError, null, DateTime.UtcNow);
|
||
}
|
||
return result;
|
||
}
|
||
|
||
// ---- IWritable ----
|
||
|
||
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
|
||
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var resp = await client.CallAsync<WriteValuesRequest, WriteValuesResponse>(
|
||
MessageKind.WriteValuesRequest,
|
||
new WriteValuesRequest
|
||
{
|
||
SessionId = _sessionId,
|
||
Writes = [.. writes.Select(FromWriteRequest)],
|
||
},
|
||
MessageKind.WriteValuesResponse,
|
||
cancellationToken);
|
||
|
||
return [.. resp.Results.Select(r => new WriteResult(r.StatusCode))];
|
||
}
|
||
|
||
// ---- ISubscribable ----
|
||
|
||
public async Task<ISubscriptionHandle> SubscribeAsync(
|
||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var resp = await client.CallAsync<SubscribeRequest, SubscribeResponse>(
|
||
MessageKind.SubscribeRequest,
|
||
new SubscribeRequest
|
||
{
|
||
SessionId = _sessionId,
|
||
TagReferences = [.. fullReferences],
|
||
RequestedIntervalMs = (int)publishingInterval.TotalMilliseconds,
|
||
},
|
||
MessageKind.SubscribeResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host Subscribe failed: {resp.Error}");
|
||
|
||
return new GalaxySubscriptionHandle(resp.SubscriptionId);
|
||
}
|
||
|
||
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var sid = ((GalaxySubscriptionHandle)handle).SubscriptionId;
|
||
await client.SendOneWayAsync(
|
||
MessageKind.UnsubscribeRequest,
|
||
new UnsubscribeRequest { SessionId = _sessionId, SubscriptionId = sid },
|
||
cancellationToken);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Internal entry point used by the IPC client when the Host pushes an
|
||
/// <see cref="MessageKind.OnDataChangeNotification"/> frame. Surfaces it as a managed
|
||
/// <see cref="OnDataChange"/> event.
|
||
/// </summary>
|
||
internal void RaiseDataChange(OnDataChangeNotification notif)
|
||
{
|
||
var handle = new GalaxySubscriptionHandle(notif.SubscriptionId);
|
||
// ISubscribable.OnDataChange fires once per changed attribute — fan out the batch.
|
||
foreach (var v in notif.Values)
|
||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, v.TagReference, ToSnapshot(v)));
|
||
}
|
||
|
||
// ---- IAlarmSource ----
|
||
|
||
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
await client.SendOneWayAsync(
|
||
MessageKind.AlarmSubscribeRequest,
|
||
new AlarmSubscribeRequest { SessionId = _sessionId },
|
||
cancellationToken);
|
||
return new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
|
||
}
|
||
|
||
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||
=> Task.CompletedTask;
|
||
|
||
public async Task AcknowledgeAsync(
|
||
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
foreach (var ack in acknowledgements)
|
||
{
|
||
await client.SendOneWayAsync(
|
||
MessageKind.AlarmAckRequest,
|
||
new AlarmAckRequest
|
||
{
|
||
SessionId = _sessionId,
|
||
EventId = ack.ConditionId,
|
||
Comment = ack.Comment ?? string.Empty,
|
||
},
|
||
cancellationToken);
|
||
}
|
||
}
|
||
|
||
internal void RaiseAlarmEvent(GalaxyAlarmEvent ev)
|
||
{
|
||
var handle = new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
|
||
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
|
||
SubscriptionHandle: handle,
|
||
SourceNodeId: ev.ObjectTagName,
|
||
ConditionId: ev.EventId,
|
||
AlarmType: ev.AlarmName,
|
||
Message: ev.Message,
|
||
Severity: MapSeverity(ev.Severity),
|
||
SourceTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(ev.UtcUnixMs).UtcDateTime));
|
||
}
|
||
|
||
// ---- IHistoryProvider ----
|
||
|
||
public async Task<HistoryReadResult> ReadRawAsync(
|
||
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var resp = await client.CallAsync<HistoryReadRequest, HistoryReadResponse>(
|
||
MessageKind.HistoryReadRequest,
|
||
new HistoryReadRequest
|
||
{
|
||
SessionId = _sessionId,
|
||
TagReferences = [fullReference],
|
||
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||
MaxValuesPerTag = maxValuesPerNode,
|
||
},
|
||
MessageKind.HistoryReadResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host HistoryRead failed: {resp.Error}");
|
||
|
||
var first = resp.Tags.FirstOrDefault();
|
||
IReadOnlyList<DataValueSnapshot> samples = first is null
|
||
? Array.Empty<DataValueSnapshot>()
|
||
: [.. first.Values.Select(ToSnapshot)];
|
||
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||
}
|
||
|
||
public async Task<HistoryReadResult> ReadProcessedAsync(
|
||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||
{
|
||
var client = RequireClient();
|
||
var column = MapAggregateToColumn(aggregate);
|
||
|
||
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
|
||
MessageKind.HistoryReadProcessedRequest,
|
||
new HistoryReadProcessedRequest
|
||
{
|
||
SessionId = _sessionId,
|
||
TagReference = fullReference,
|
||
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||
IntervalMs = (long)interval.TotalMilliseconds,
|
||
AggregateColumn = column,
|
||
},
|
||
MessageKind.HistoryReadProcessedResponse,
|
||
cancellationToken);
|
||
|
||
if (!resp.Success)
|
||
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
|
||
|
||
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
|
||
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
|
||
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
|
||
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
|
||
/// </summary>
|
||
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
|
||
{
|
||
HistoryAggregateType.Average => "Average",
|
||
HistoryAggregateType.Minimum => "Minimum",
|
||
HistoryAggregateType.Maximum => "Maximum",
|
||
HistoryAggregateType.Count => "ValueCount",
|
||
HistoryAggregateType.Total => throw new NotSupportedException(
|
||
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
|
||
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
|
||
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
|
||
};
|
||
|
||
// ---- IRediscoverable ----
|
||
|
||
/// <summary>
|
||
/// Triggered by the IPC client when the Host pushes a deploy-watermark notification
|
||
/// (Galaxy <c>time_of_last_deploy</c> changed per decision #54).
|
||
/// </summary>
|
||
internal void RaiseRediscoveryNeeded(string reason, string? scopeHint = null) =>
|
||
OnRediscoveryNeeded?.Invoke(this, new RediscoveryEventArgs(reason, scopeHint));
|
||
|
||
// ---- IHostConnectivityProbe ----
|
||
|
||
public IReadOnlyList<Core.Abstractions.HostConnectivityStatus> GetHostStatuses() => _hostStatuses;
|
||
|
||
internal void OnHostConnectivityUpdate(IpcHostConnectivityStatus update)
|
||
{
|
||
var translated = new Core.Abstractions.HostConnectivityStatus(
|
||
HostName: update.HostName,
|
||
State: ParseHostState(update.RuntimeStatus),
|
||
LastChangedUtc: DateTimeOffset.FromUnixTimeMilliseconds(update.LastObservedUtcUnixMs).UtcDateTime);
|
||
|
||
var prior = _hostStatuses.FirstOrDefault(h => h.HostName == translated.HostName);
|
||
_hostStatuses = [
|
||
.. _hostStatuses.Where(h => h.HostName != translated.HostName),
|
||
translated
|
||
];
|
||
|
||
if (prior is null || prior.State != translated.State)
|
||
{
|
||
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(
|
||
translated.HostName, prior?.State ?? HostState.Unknown, translated.State));
|
||
}
|
||
}
|
||
|
||
private static HostState ParseHostState(string s) => s switch
|
||
{
|
||
"Running" => HostState.Running,
|
||
"Stopped" => HostState.Stopped,
|
||
"Faulted" => HostState.Faulted,
|
||
_ => HostState.Unknown,
|
||
};
|
||
|
||
// ---- helpers ----
|
||
|
||
private GalaxyIpcClient RequireClient() =>
|
||
_client ?? throw new InvalidOperationException("Driver not initialized");
|
||
|
||
private const uint StatusBadInternalError = 0x80020000u;
|
||
|
||
private static DataValueSnapshot ToSnapshot(GalaxyDataValue v) => new(
|
||
Value: v.ValueBytes,
|
||
StatusCode: v.StatusCode,
|
||
SourceTimestampUtc: v.SourceTimestampUtcUnixMs > 0
|
||
? DateTimeOffset.FromUnixTimeMilliseconds(v.SourceTimestampUtcUnixMs).UtcDateTime
|
||
: null,
|
||
ServerTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(v.ServerTimestampUtcUnixMs).UtcDateTime);
|
||
|
||
private static GalaxyDataValue FromWriteRequest(WriteRequest w) => new()
|
||
{
|
||
TagReference = w.FullReference,
|
||
ValueBytes = MessagePack.MessagePackSerializer.Serialize(w.Value),
|
||
ValueMessagePackType = 0,
|
||
StatusCode = 0,
|
||
SourceTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||
};
|
||
|
||
private static DriverDataType MapDataType(int mxDataType) => mxDataType switch
|
||
{
|
||
0 => DriverDataType.Boolean,
|
||
1 => DriverDataType.Int32,
|
||
2 => DriverDataType.Float32,
|
||
3 => DriverDataType.Float64,
|
||
4 => DriverDataType.String,
|
||
5 => DriverDataType.DateTime,
|
||
_ => DriverDataType.String,
|
||
};
|
||
|
||
private static SecurityClassification MapSecurity(int mxSec) => mxSec switch
|
||
{
|
||
0 => SecurityClassification.FreeAccess,
|
||
1 => SecurityClassification.Operate,
|
||
2 => SecurityClassification.SecuredWrite,
|
||
3 => SecurityClassification.VerifiedWrite,
|
||
4 => SecurityClassification.Tune,
|
||
5 => SecurityClassification.Configure,
|
||
6 => SecurityClassification.ViewOnly,
|
||
_ => SecurityClassification.FreeAccess,
|
||
};
|
||
|
||
private static AlarmSeverity MapSeverity(int sev) => sev switch
|
||
{
|
||
<= 250 => AlarmSeverity.Low,
|
||
<= 500 => AlarmSeverity.Medium,
|
||
<= 800 => AlarmSeverity.High,
|
||
_ => AlarmSeverity.Critical,
|
||
};
|
||
|
||
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||
}
|
||
|
||
internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle
|
||
{
|
||
public string DiagnosticId => $"galaxy-sub-{SubscriptionId}";
|
||
}
|
||
|
||
internal sealed record GalaxyAlarmSubscriptionHandle(string Id) : IAlarmSubscriptionHandle
|
||
{
|
||
public string DiagnosticId => Id;
|
||
}
|
||
|
||
public sealed class GalaxyProxyOptions
|
||
{
|
||
public required string DriverInstanceId { get; init; }
|
||
public required string PipeName { get; init; }
|
||
public required string SharedSecret { get; init; }
|
||
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
|
||
}
|