Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs
Joseph Doherty 190d09cdeb Phase 3 PR 15 — alarm-condition contract in IAddressSpaceBuilder + wire OnAlarmEvent through GenericDriverNodeManager. IAddressSpaceBuilder.IVariableHandle gains MarkAsAlarmCondition(AlarmConditionInfo) which returns an IAlarmConditionSink. AlarmConditionInfo carries SourceName/InitialSeverity/InitialDescription. Concrete address-space builders (the upcoming PR 16 OPC UA server backend) materialize a sibling AlarmConditionState node on the first call; the sink receives every lifecycle transition the generic node manager forwards. GenericDriverNodeManager gains a CapturingBuilder wrapper that transparently wraps every Folder/Variable call — the wrapper observes MarkAsAlarmCondition calls without participating in materialization, captures the resulting IAlarmConditionSink into an internal source-node-id → sink ConcurrentDictionary keyed by IVariableHandle.FullReference. After DiscoverAsync completes, if the driver implements IAlarmSource the node manager subscribes to OnAlarmEvent and routes every AlarmEventArgs to the sink registered for args.SourceNodeId — unknown source ids are dropped silently (may belong to another driver or to a variable the builder chose not to flag). Dispose unsubscribes the forwarder to prevent dangling invocation-list references across node-manager rebuilds. GalaxyProxyDriver.DiscoverAsync now calls handle.MarkAsAlarmCondition(new AlarmConditionInfo(fullName, AlarmSeverity.Medium, null)) on every attr.IsAlarm=true variable — severity seed is Medium because the live Priority byte arrives through the subsequent GalaxyAlarmEvent stream (which PR 14's GalaxyAlarmTracker now emits); the Admin UI sees the severity update on the first transition. RecordingAddressSpaceBuilder in Driver.Galaxy.E2E gains a RecordedAlarmCondition list + a RecordingSink implementation that captures AlarmEventArgs for test assertion — the E2E parity suite can now verify alarm-condition registration shape in addition to folder/variable shape. Tests (4 new GenericDriverNodeManagerTests): Alarm_events_are_routed_to_the_sink_registered_for_the_matching_source_node_id — 2 alarms registered (Tank.HiHi + Heater.OverTemp), driver raises an event for Tank.HiHi, the Tank.HiHi sink captures the payload, the Heater.OverTemp sink does not (tag-scoped fan-out, not broadcast); Non_alarm_variables_do_not_register_sinks — plain Tank.Level in the same discover is not in TrackedAlarmSources; Unknown_source_node_id_is_dropped_silently — a transition for Unknown.Source doesn't reach any sink + no exception; Dispose_unsubscribes_from_OnAlarmEvent — post-dispose, a transition for a previously-registered tag is no-op because the forwarder detached. InternalsVisibleTo('ZB.MOM.WW.OtOpcUa.Core.Tests') added to Core csproj so TrackedAlarmSources internal property is visible to the test. Full solution: 0 errors, 152 unit tests pass (8 Core + 14 Proxy + 14 Admin + 24 Configuration + 6 Shared + 84 Galaxy.Host + 2 Server). PR 16 will implement the concrete OPC UA address-space builder that materializes AlarmConditionState from this contract.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:51:35 -04:00

476 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
/// <summary>
/// <see cref="IDriver"/> implementation that forwards every capability over the Galaxy IPC
/// channel to the out-of-process Host. Implements the full Phase 2 capability surface;
/// bodies that depend on the deferred Host-side MXAccess code lift will surface
/// <see cref="GalaxyIpcException"/> with code <c>not-implemented</c> until the Host's
/// <c>IGalaxyBackend</c> is wired to the real <c>MxAccessClient</c>.
/// </summary>
public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
: IDriver,
ITagDiscovery,
IReadable,
IWritable,
ISubscribable,
IAlarmSource,
IHistoryProvider,
IRediscoverable,
IHostConnectivityProbe,
IDisposable
{
private GalaxyIpcClient? _client;
private long _sessionId;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private IReadOnlyList<Core.Abstractions.HostConnectivityStatus> _hostStatuses = [];
public string DriverInstanceId => options.DriverInstanceId;
public string DriverType => "Galaxy";
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
public event EventHandler<RediscoveryEventArgs>? OnRediscoveryNeeded;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
try
{
_client = await GalaxyIpcClient.ConnectAsync(
options.PipeName, options.SharedSecret, options.ConnectTimeout, cancellationToken);
var resp = await _client.CallAsync<OpenSessionRequest, OpenSessionResponse>(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = DriverInstanceId, DriverConfigJson = driverConfigJson },
MessageKind.OpenSessionResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host OpenSession failed: {resp.Error}");
_sessionId = resp.SessionId;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
{
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken);
await InitializeAsync(driverConfigJson, cancellationToken);
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
if (_client is null) return;
try
{
await _client.SendOneWayAsync(
MessageKind.CloseSessionRequest,
new CloseSessionRequest { SessionId = _sessionId },
cancellationToken);
}
catch { /* shutdown is best effort */ }
await _client.DisposeAsync();
_client = null;
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var client = RequireClient();
var resp = await client.CallAsync<DiscoverHierarchyRequest, DiscoverHierarchyResponse>(
MessageKind.DiscoverHierarchyRequest,
new DiscoverHierarchyRequest { SessionId = _sessionId },
MessageKind.DiscoverHierarchyResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host DiscoverHierarchy failed: {resp.Error}");
foreach (var obj in resp.Objects)
{
var folder = builder.Folder(obj.ContainedName, obj.ContainedName);
foreach (var attr in obj.Attributes)
{
var fullName = $"{obj.TagName}.{attr.AttributeName}";
var handle = folder.Variable(
attr.AttributeName,
attr.AttributeName,
new DriverAttributeInfo(
FullName: fullName,
DriverDataType: MapDataType(attr.MxDataType),
IsArray: attr.IsArray,
ArrayDim: attr.ArrayDim,
SecurityClass: MapSecurity(attr.SecurityClassification),
IsHistorized: attr.IsHistorized,
IsAlarm: attr.IsAlarm));
// PR 15: when Galaxy flags the attribute as alarm-bearing (AlarmExtension
// primitive), register an alarm-condition sink so the generic node manager
// can route OnAlarmEvent payloads for this tag to the concrete address-space
// builder. Severity default Medium — the live severity arrives through
// AlarmEventArgs once MxAccessGalaxyBackend's tracker starts firing.
if (attr.IsAlarm)
{
handle.MarkAsAlarmCondition(new AlarmConditionInfo(
SourceName: fullName,
InitialSeverity: AlarmSeverity.Medium,
InitialDescription: null));
}
}
}
}
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<ReadValuesRequest, ReadValuesResponse>(
MessageKind.ReadValuesRequest,
new ReadValuesRequest { SessionId = _sessionId, TagReferences = [.. fullReferences] },
MessageKind.ReadValuesResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host ReadValues failed: {resp.Error}");
var byRef = resp.Values.ToDictionary(v => v.TagReference);
var result = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
result[i] = byRef.TryGetValue(fullReferences[i], out var v)
? ToSnapshot(v)
: new DataValueSnapshot(null, StatusBadInternalError, null, DateTime.UtcNow);
}
return result;
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<WriteValuesRequest, WriteValuesResponse>(
MessageKind.WriteValuesRequest,
new WriteValuesRequest
{
SessionId = _sessionId,
Writes = [.. writes.Select(FromWriteRequest)],
},
MessageKind.WriteValuesResponse,
cancellationToken);
return [.. resp.Results.Select(r => new WriteResult(r.StatusCode))];
}
// ---- ISubscribable ----
public async Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<SubscribeRequest, SubscribeResponse>(
MessageKind.SubscribeRequest,
new SubscribeRequest
{
SessionId = _sessionId,
TagReferences = [.. fullReferences],
RequestedIntervalMs = (int)publishingInterval.TotalMilliseconds,
},
MessageKind.SubscribeResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host Subscribe failed: {resp.Error}");
return new GalaxySubscriptionHandle(resp.SubscriptionId);
}
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
var client = RequireClient();
var sid = ((GalaxySubscriptionHandle)handle).SubscriptionId;
await client.SendOneWayAsync(
MessageKind.UnsubscribeRequest,
new UnsubscribeRequest { SessionId = _sessionId, SubscriptionId = sid },
cancellationToken);
}
/// <summary>
/// Internal entry point used by the IPC client when the Host pushes an
/// <see cref="MessageKind.OnDataChangeNotification"/> frame. Surfaces it as a managed
/// <see cref="OnDataChange"/> event.
/// </summary>
internal void RaiseDataChange(OnDataChangeNotification notif)
{
var handle = new GalaxySubscriptionHandle(notif.SubscriptionId);
// ISubscribable.OnDataChange fires once per changed attribute — fan out the batch.
foreach (var v in notif.Values)
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, v.TagReference, ToSnapshot(v)));
}
// ---- IAlarmSource ----
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
var client = RequireClient();
await client.SendOneWayAsync(
MessageKind.AlarmSubscribeRequest,
new AlarmSubscribeRequest { SessionId = _sessionId },
cancellationToken);
return new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
}
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
=> Task.CompletedTask;
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
var client = RequireClient();
foreach (var ack in acknowledgements)
{
await client.SendOneWayAsync(
MessageKind.AlarmAckRequest,
new AlarmAckRequest
{
SessionId = _sessionId,
EventId = ack.ConditionId,
Comment = ack.Comment ?? string.Empty,
},
cancellationToken);
}
}
internal void RaiseAlarmEvent(GalaxyAlarmEvent ev)
{
var handle = new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: ev.ObjectTagName,
ConditionId: ev.EventId,
AlarmType: ev.AlarmName,
Message: ev.Message,
Severity: MapSeverity(ev.Severity),
SourceTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(ev.UtcUnixMs).UtcDateTime));
}
// ---- IHistoryProvider ----
public async Task<HistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<HistoryReadRequest, HistoryReadResponse>(
MessageKind.HistoryReadRequest,
new HistoryReadRequest
{
SessionId = _sessionId,
TagReferences = [fullReference],
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
MaxValuesPerTag = maxValuesPerNode,
},
MessageKind.HistoryReadResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryRead failed: {resp.Error}");
var first = resp.Tags.FirstOrDefault();
IReadOnlyList<DataValueSnapshot> samples = first is null
? Array.Empty<DataValueSnapshot>()
: [.. first.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
var client = RequireClient();
var column = MapAggregateToColumn(aggregate);
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
MessageKind.HistoryReadProcessedRequest,
new HistoryReadProcessedRequest
{
SessionId = _sessionId,
TagReference = fullReference,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
IntervalMs = (long)interval.TotalMilliseconds,
AggregateColumn = column,
},
MessageKind.HistoryReadProcessedResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
/// <summary>
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
/// </summary>
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
// ---- IRediscoverable ----
/// <summary>
/// Triggered by the IPC client when the Host pushes a deploy-watermark notification
/// (Galaxy <c>time_of_last_deploy</c> changed per decision #54).
/// </summary>
internal void RaiseRediscoveryNeeded(string reason, string? scopeHint = null) =>
OnRediscoveryNeeded?.Invoke(this, new RediscoveryEventArgs(reason, scopeHint));
// ---- IHostConnectivityProbe ----
public IReadOnlyList<Core.Abstractions.HostConnectivityStatus> GetHostStatuses() => _hostStatuses;
internal void OnHostConnectivityUpdate(IpcHostConnectivityStatus update)
{
var translated = new Core.Abstractions.HostConnectivityStatus(
HostName: update.HostName,
State: ParseHostState(update.RuntimeStatus),
LastChangedUtc: DateTimeOffset.FromUnixTimeMilliseconds(update.LastObservedUtcUnixMs).UtcDateTime);
var prior = _hostStatuses.FirstOrDefault(h => h.HostName == translated.HostName);
_hostStatuses = [
.. _hostStatuses.Where(h => h.HostName != translated.HostName),
translated
];
if (prior is null || prior.State != translated.State)
{
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(
translated.HostName, prior?.State ?? HostState.Unknown, translated.State));
}
}
private static HostState ParseHostState(string s) => s switch
{
"Running" => HostState.Running,
"Stopped" => HostState.Stopped,
"Faulted" => HostState.Faulted,
_ => HostState.Unknown,
};
// ---- helpers ----
private GalaxyIpcClient RequireClient() =>
_client ?? throw new InvalidOperationException("Driver not initialized");
private const uint StatusBadInternalError = 0x80020000u;
private static DataValueSnapshot ToSnapshot(GalaxyDataValue v) => new(
Value: v.ValueBytes,
StatusCode: v.StatusCode,
SourceTimestampUtc: v.SourceTimestampUtcUnixMs > 0
? DateTimeOffset.FromUnixTimeMilliseconds(v.SourceTimestampUtcUnixMs).UtcDateTime
: null,
ServerTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(v.ServerTimestampUtcUnixMs).UtcDateTime);
private static GalaxyDataValue FromWriteRequest(WriteRequest w) => new()
{
TagReference = w.FullReference,
ValueBytes = MessagePack.MessagePackSerializer.Serialize(w.Value),
ValueMessagePackType = 0,
StatusCode = 0,
SourceTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static DriverDataType MapDataType(int mxDataType) => mxDataType switch
{
0 => DriverDataType.Boolean,
1 => DriverDataType.Int32,
2 => DriverDataType.Float32,
3 => DriverDataType.Float64,
4 => DriverDataType.String,
5 => DriverDataType.DateTime,
_ => DriverDataType.String,
};
private static SecurityClassification MapSecurity(int mxSec) => mxSec switch
{
0 => SecurityClassification.FreeAccess,
1 => SecurityClassification.Operate,
2 => SecurityClassification.SecuredWrite,
3 => SecurityClassification.VerifiedWrite,
4 => SecurityClassification.Tune,
5 => SecurityClassification.Configure,
6 => SecurityClassification.ViewOnly,
_ => SecurityClassification.FreeAccess,
};
private static AlarmSeverity MapSeverity(int sev) => sev switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 800 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle
{
public string DiagnosticId => $"galaxy-sub-{SubscriptionId}";
}
internal sealed record GalaxyAlarmSubscriptionHandle(string Id) : IAlarmSubscriptionHandle
{
public string DiagnosticId => Id;
}
public sealed class GalaxyProxyOptions
{
public required string DriverInstanceId { get; init; }
public required string PipeName { get; init; }
public required string SharedSecret { get; init; }
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
}