using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
///
/// implementation that forwards every capability over the Galaxy IPC
/// channel to the out-of-process Host. Implements the full Phase 2 capability surface;
/// bodies that depend on the deferred Host-side MXAccess code lift will surface
/// with code not-implemented until the Host's
/// IGalaxyBackend is wired to the real MxAccessClient.
///
public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
: IDriver,
ITagDiscovery,
IReadable,
IWritable,
ISubscribable,
IAlarmSource,
IHistoryProvider,
IRediscoverable,
IHostConnectivityProbe,
IDisposable
{
private GalaxyIpcClient? _client;
private long _sessionId;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private IReadOnlyList _hostStatuses = [];
public string DriverInstanceId => options.DriverInstanceId;
public string DriverType => "Galaxy";
public event EventHandler? OnDataChange;
public event EventHandler? OnAlarmEvent;
public event EventHandler? OnRediscoveryNeeded;
public event EventHandler? OnHostStatusChanged;
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
try
{
_client = await GalaxyIpcClient.ConnectAsync(
options.PipeName, options.SharedSecret, options.ConnectTimeout, cancellationToken);
var resp = await _client.CallAsync(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = DriverInstanceId, DriverConfigJson = driverConfigJson },
MessageKind.OpenSessionResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host OpenSession failed: {resp.Error}");
_sessionId = resp.SessionId;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
{
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken);
await InitializeAsync(driverConfigJson, cancellationToken);
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
if (_client is null) return;
try
{
await _client.SendOneWayAsync(
MessageKind.CloseSessionRequest,
new CloseSessionRequest { SessionId = _sessionId },
cancellationToken);
}
catch { /* shutdown is best effort */ }
await _client.DisposeAsync();
_client = null;
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.DiscoverHierarchyRequest,
new DiscoverHierarchyRequest { SessionId = _sessionId },
MessageKind.DiscoverHierarchyResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host DiscoverHierarchy failed: {resp.Error}");
foreach (var obj in resp.Objects)
{
var folder = builder.Folder(obj.ContainedName, obj.ContainedName);
foreach (var attr in obj.Attributes)
{
var fullName = $"{obj.TagName}.{attr.AttributeName}";
var handle = folder.Variable(
attr.AttributeName,
attr.AttributeName,
new DriverAttributeInfo(
FullName: fullName,
DriverDataType: MapDataType(attr.MxDataType),
IsArray: attr.IsArray,
ArrayDim: attr.ArrayDim,
SecurityClass: MapSecurity(attr.SecurityClassification),
IsHistorized: attr.IsHistorized,
IsAlarm: attr.IsAlarm));
// PR 15: when Galaxy flags the attribute as alarm-bearing (AlarmExtension
// primitive), register an alarm-condition sink so the generic node manager
// can route OnAlarmEvent payloads for this tag to the concrete address-space
// builder. Severity default Medium — the live severity arrives through
// AlarmEventArgs once MxAccessGalaxyBackend's tracker starts firing.
if (attr.IsAlarm)
{
handle.MarkAsAlarmCondition(new AlarmConditionInfo(
SourceName: fullName,
InitialSeverity: AlarmSeverity.Medium,
InitialDescription: null));
}
}
}
}
// ---- IReadable ----
public async Task> ReadAsync(
IReadOnlyList fullReferences, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.ReadValuesRequest,
new ReadValuesRequest { SessionId = _sessionId, TagReferences = [.. fullReferences] },
MessageKind.ReadValuesResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host ReadValues failed: {resp.Error}");
var byRef = resp.Values.ToDictionary(v => v.TagReference);
var result = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
result[i] = byRef.TryGetValue(fullReferences[i], out var v)
? ToSnapshot(v)
: new DataValueSnapshot(null, StatusBadInternalError, null, DateTime.UtcNow);
}
return result;
}
// ---- IWritable ----
public async Task> WriteAsync(
IReadOnlyList writes, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.WriteValuesRequest,
new WriteValuesRequest
{
SessionId = _sessionId,
Writes = [.. writes.Select(FromWriteRequest)],
},
MessageKind.WriteValuesResponse,
cancellationToken);
return [.. resp.Results.Select(r => new WriteResult(r.StatusCode))];
}
// ---- ISubscribable ----
public async Task SubscribeAsync(
IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.SubscribeRequest,
new SubscribeRequest
{
SessionId = _sessionId,
TagReferences = [.. fullReferences],
RequestedIntervalMs = (int)publishingInterval.TotalMilliseconds,
},
MessageKind.SubscribeResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host Subscribe failed: {resp.Error}");
return new GalaxySubscriptionHandle(resp.SubscriptionId);
}
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
var client = RequireClient();
var sid = ((GalaxySubscriptionHandle)handle).SubscriptionId;
await client.SendOneWayAsync(
MessageKind.UnsubscribeRequest,
new UnsubscribeRequest { SessionId = _sessionId, SubscriptionId = sid },
cancellationToken);
}
///
/// Internal entry point used by the IPC client when the Host pushes an
/// frame. Surfaces it as a managed
/// event.
///
internal void RaiseDataChange(OnDataChangeNotification notif)
{
var handle = new GalaxySubscriptionHandle(notif.SubscriptionId);
// ISubscribable.OnDataChange fires once per changed attribute — fan out the batch.
foreach (var v in notif.Values)
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, v.TagReference, ToSnapshot(v)));
}
// ---- IAlarmSource ----
public async Task SubscribeAlarmsAsync(
IReadOnlyList sourceNodeIds, CancellationToken cancellationToken)
{
var client = RequireClient();
await client.SendOneWayAsync(
MessageKind.AlarmSubscribeRequest,
new AlarmSubscribeRequest { SessionId = _sessionId },
cancellationToken);
return new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
}
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
=> Task.CompletedTask;
public async Task AcknowledgeAsync(
IReadOnlyList acknowledgements, CancellationToken cancellationToken)
{
var client = RequireClient();
foreach (var ack in acknowledgements)
{
await client.SendOneWayAsync(
MessageKind.AlarmAckRequest,
new AlarmAckRequest
{
SessionId = _sessionId,
EventId = ack.ConditionId,
Comment = ack.Comment ?? string.Empty,
},
cancellationToken);
}
}
internal void RaiseAlarmEvent(GalaxyAlarmEvent ev)
{
var handle = new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: ev.ObjectTagName,
ConditionId: ev.EventId,
AlarmType: ev.AlarmName,
Message: ev.Message,
Severity: MapSeverity(ev.Severity),
SourceTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(ev.UtcUnixMs).UtcDateTime));
}
// ---- IHistoryProvider ----
public async Task ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.HistoryReadRequest,
new HistoryReadRequest
{
SessionId = _sessionId,
TagReferences = [fullReference],
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
MaxValuesPerTag = maxValuesPerNode,
},
MessageKind.HistoryReadResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryRead failed: {resp.Error}");
var first = resp.Tags.FirstOrDefault();
IReadOnlyList samples = first is null
? Array.Empty()
: [.. first.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
var client = RequireClient();
var column = MapAggregateToColumn(aggregate);
var resp = await client.CallAsync(
MessageKind.HistoryReadProcessedRequest,
new HistoryReadProcessedRequest
{
SessionId = _sessionId,
TagReference = fullReference,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
IntervalMs = (long)interval.TotalMilliseconds,
AggregateColumn = column,
},
MessageKind.HistoryReadProcessedResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
IReadOnlyList samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task ReadAtTimeAsync(
string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.HistoryReadAtTimeRequest,
new HistoryReadAtTimeRequest
{
SessionId = _sessionId,
TagReference = fullReference,
TimestampsUtcUnixMs = [.. timestampsUtc.Select(t => new DateTimeOffset(t, TimeSpan.Zero).ToUnixTimeMilliseconds())],
},
MessageKind.HistoryReadAtTimeResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadAtTime failed: {resp.Error}");
// ReadAtTime returns one sample per requested timestamp in the same order — the Host
// pads with bad-quality snapshots when a timestamp can't be interpolated, so response
// length matches request length exactly. We trust that contract rather than
// re-aligning here, because the Host is the source-of-truth for interpolation policy.
IReadOnlyList samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync(
MessageKind.HistoryReadEventsRequest,
new HistoryReadEventsRequest
{
SessionId = _sessionId,
SourceName = sourceName,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
MaxEvents = maxEvents,
},
MessageKind.HistoryReadEventsResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadEvents failed: {resp.Error}");
IReadOnlyList events = [.. resp.Events.Select(ToHistoricalEvent)];
return new HistoricalEventsResult(events, ContinuationPoint: null);
}
internal static HistoricalEvent ToHistoricalEvent(GalaxyHistoricalEvent wire) => new(
EventId: wire.EventId,
SourceName: wire.SourceName,
EventTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.EventTimeUtcUnixMs).UtcDateTime,
ReceivedTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.ReceivedTimeUtcUnixMs).UtcDateTime,
Message: wire.DisplayText,
Severity: wire.Severity);
///
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by HistorianDataSource.ReadAggregateAsync.
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
///
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
// ---- IRediscoverable ----
///
/// Triggered by the IPC client when the Host pushes a deploy-watermark notification
/// (Galaxy time_of_last_deploy changed per decision #54).
///
internal void RaiseRediscoveryNeeded(string reason, string? scopeHint = null) =>
OnRediscoveryNeeded?.Invoke(this, new RediscoveryEventArgs(reason, scopeHint));
// ---- IHostConnectivityProbe ----
public IReadOnlyList GetHostStatuses() => _hostStatuses;
internal void OnHostConnectivityUpdate(IpcHostConnectivityStatus update)
{
var translated = new Core.Abstractions.HostConnectivityStatus(
HostName: update.HostName,
State: ParseHostState(update.RuntimeStatus),
LastChangedUtc: DateTimeOffset.FromUnixTimeMilliseconds(update.LastObservedUtcUnixMs).UtcDateTime);
var prior = _hostStatuses.FirstOrDefault(h => h.HostName == translated.HostName);
_hostStatuses = [
.. _hostStatuses.Where(h => h.HostName != translated.HostName),
translated
];
if (prior is null || prior.State != translated.State)
{
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(
translated.HostName, prior?.State ?? HostState.Unknown, translated.State));
}
}
private static HostState ParseHostState(string s) => s switch
{
"Running" => HostState.Running,
"Stopped" => HostState.Stopped,
"Faulted" => HostState.Faulted,
_ => HostState.Unknown,
};
// ---- helpers ----
private GalaxyIpcClient RequireClient() =>
_client ?? throw new InvalidOperationException("Driver not initialized");
private const uint StatusBadInternalError = 0x80020000u;
private static DataValueSnapshot ToSnapshot(GalaxyDataValue v) => new(
Value: v.ValueBytes,
StatusCode: v.StatusCode,
SourceTimestampUtc: v.SourceTimestampUtcUnixMs > 0
? DateTimeOffset.FromUnixTimeMilliseconds(v.SourceTimestampUtcUnixMs).UtcDateTime
: null,
ServerTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(v.ServerTimestampUtcUnixMs).UtcDateTime);
private static GalaxyDataValue FromWriteRequest(WriteRequest w) => new()
{
TagReference = w.FullReference,
ValueBytes = MessagePack.MessagePackSerializer.Serialize(w.Value),
ValueMessagePackType = 0,
StatusCode = 0,
SourceTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static DriverDataType MapDataType(int mxDataType) => mxDataType switch
{
0 => DriverDataType.Boolean,
1 => DriverDataType.Int32,
2 => DriverDataType.Float32,
3 => DriverDataType.Float64,
4 => DriverDataType.String,
5 => DriverDataType.DateTime,
_ => DriverDataType.String,
};
private static SecurityClassification MapSecurity(int mxSec) => mxSec switch
{
0 => SecurityClassification.FreeAccess,
1 => SecurityClassification.Operate,
2 => SecurityClassification.SecuredWrite,
3 => SecurityClassification.VerifiedWrite,
4 => SecurityClassification.Tune,
5 => SecurityClassification.Configure,
6 => SecurityClassification.ViewOnly,
_ => SecurityClassification.FreeAccess,
};
private static AlarmSeverity MapSeverity(int sev) => sev switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 800 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle
{
public string DiagnosticId => $"galaxy-sub-{SubscriptionId}";
}
internal sealed record GalaxyAlarmSubscriptionHandle(string Id) : IAlarmSubscriptionHandle
{
public string DiagnosticId => Id;
}
public sealed class GalaxyProxyOptions
{
public required string DriverInstanceId { get; init; }
public required string PipeName { get; init; }
public required string SharedSecret { get; init; }
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
}