Extract historian into a runtime-loaded plugin so hosts without the Wonderware SDK can run with Historian.Enabled=false

The aahClientManaged SDK is now isolated in ZB.MOM.WW.LmxOpcUa.Historian.Aveva and loaded via HistorianPluginLoader from a Historian/ subfolder only when enabled, removing the SDK from Host's compile-time and deploy-time surface.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-12 15:16:07 -04:00
parent 9e1a180ce3
commit 9b42b61eb6
40 changed files with 739 additions and 15855 deletions

View File

@@ -0,0 +1,15 @@
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva
{
/// <summary>
/// Reflection entry point invoked by <c>HistorianPluginLoader</c> in the Host. Kept
/// deliberately simple so the plugin contract is a single static factory method.
/// </summary>
public static class AvevaHistorianPluginEntry
{
public static IHistorianDataSource Create(HistorianConfiguration config)
=> new HistorianDataSource(config);
}
}

View File

@@ -0,0 +1,533 @@
using System;
using System.Collections.Generic;
using StringCollection = System.Collections.Specialized.StringCollection;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Opc.Ua;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva
{
/// <summary>
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
/// </summary>
public sealed class HistorianDataSource : IHistorianDataSource
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
private readonly HistorianConfiguration _config;
private readonly object _connectionLock = new object();
private readonly object _eventConnectionLock = new object();
private readonly IHistorianConnectionFactory _factory;
private HistorianAccess? _connection;
private HistorianAccess? _eventConnection;
private bool _disposed;
/// <summary>
/// Initializes a Historian reader that translates OPC UA history requests into Wonderware Historian SDK queries.
/// </summary>
/// <param name="config">The Historian SDK connection settings used for runtime history lookups.</param>
public HistorianDataSource(HistorianConfiguration config)
: this(config, new SdkHistorianConnectionFactory()) { }
/// <summary>
/// Initializes a Historian reader with a custom connection factory for testing.
/// </summary>
internal HistorianDataSource(HistorianConfiguration config, IHistorianConnectionFactory factory)
{
_config = config;
_factory = factory;
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
// Fast path: already connected (no lock needed)
if (Volatile.Read(ref _connection) != null)
return;
// Create and wait for connection outside the lock so concurrent history
// requests are not serialized behind a slow Historian handshake.
var conn = _factory.CreateAndConnect(_config, HistorianConnectionType.Process);
lock (_connectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_connection != null)
{
// Another thread connected while we were waiting
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_connection = conn;
Log.Information("Historian SDK connection opened to {Server}:{Port}", _config.ServerName, _config.Port);
}
}
private void HandleConnectionError(Exception? ex = null)
{
lock (_connectionLock)
{
if (_connection == null)
return;
try
{
_connection.CloseConnection(out _);
_connection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
}
_connection = null;
Log.Warning(ex, "Historian SDK connection reset — will reconnect on next request");
}
}
private void EnsureEventConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _eventConnection) != null)
return;
var conn = _factory.CreateAndConnect(_config, HistorianConnectionType.Event);
lock (_eventConnectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_eventConnection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_eventConnection = conn;
Log.Information("Historian SDK event connection opened to {Server}:{Port}",
_config.ServerName, _config.Port);
}
}
private void HandleEventConnectionError(Exception? ex = null)
{
lock (_eventConnectionLock)
{
if (_eventConnection == null)
return;
try
{
_eventConnection.CloseConnection(out _);
_eventConnection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
}
_eventConnection = null;
Log.Warning(ex, "Historian SDK event connection reset — will reconnect on next request");
}
}
/// <inheritdoc />
public Task<List<DataValue>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List<DataValue>();
try
{
EnsureConnected();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
RetrievalMode = HistorianRetrievalMode.Full
};
if (maxValues > 0)
args.BatchSize = (uint)maxValues;
else if (_config.MaxValuesPerRead > 0)
args.BatchSize = (uint)_config.MaxValuesPerRead;
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
HandleConnectionError();
return Task.FromResult(results);
}
var count = 0;
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
var quality = (byte)(result.OpcQuality & 0xFF);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = QualityMapper.MapToOpcUaStatusCode(QualityMapper.MapFromMxAccessQuality(quality))
});
count++;
if (limit > 0 && count >= limit)
break;
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return Task.FromResult(results);
}
/// <inheritdoc />
public Task<List<DataValue>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List<DataValue>();
try
{
EnsureConnected();
using var query = _connection!.CreateAnalogSummaryQuery();
var args = new AnalogSummaryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
Resolution = (ulong)intervalMs
};
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName,
error.ErrorCode);
HandleConnectionError();
return Task.FromResult(results);
}
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
var value = ExtractAggregateValue(result, aggregateColumn);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = value != null ? StatusCodes.Good : StatusCodes.BadNoData
});
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return Task.FromResult(results);
}
/// <inheritdoc />
public Task<List<DataValue>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default)
{
var results = new List<DataValue>();
if (timestamps == null || timestamps.Length == 0)
return Task.FromResult(results);
try
{
EnsureConnected();
foreach (var timestamp in timestamps)
{
ct.ThrowIfCancellationRequested();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = timestamp,
EndDateTime = timestamp,
RetrievalMode = HistorianRetrievalMode.Interpolated,
BatchSize = 1
};
if (!query.StartQuery(args, out var error))
{
results.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = StatusCodes.BadNoData
});
continue;
}
if (query.MoveNext(out error))
{
var result = query.QueryResult;
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
var quality = (byte)(result.OpcQuality & 0xFF);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = QualityMapper.MapToOpcUaStatusCode(
QualityMapper.MapFromMxAccessQuality(quality))
});
}
else
{
results.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = StatusCodes.BadNoData
});
}
query.EndQuery(out _);
}
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
tagName, results.Count, timestamps.Length);
return Task.FromResult(results);
}
/// <inheritdoc />
public Task<List<HistorianEventDto>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default)
{
var results = new List<HistorianEventDto>();
try
{
EnsureEventConnected();
using var query = _eventConnection!.CreateEventQuery();
var args = new EventQueryArgs
{
StartDateTime = startTime,
EndDateTime = endTime,
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
QueryType = HistorianEventQueryType.Events,
EventOrder = HistorianEventOrder.Ascending
};
if (!string.IsNullOrEmpty(sourceName))
{
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
}
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
HandleEventConnectionError();
return Task.FromResult(results);
}
var count = 0;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
results.Add(ToDto(query.QueryResult));
count++;
if (maxEvents > 0 && count >= maxEvents)
break;
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
HandleEventConnectionError(ex);
}
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
sourceName ?? "(all)", results.Count, startTime, endTime);
return Task.FromResult(results);
}
private static HistorianEventDto ToDto(HistorianEvent evt)
{
return new HistorianEventDto
{
Id = evt.Id,
Source = evt.Source,
EventTime = evt.EventTime,
ReceivedTime = evt.ReceivedTime,
DisplayText = evt.DisplayText,
Severity = (ushort)evt.Severity
};
}
/// <summary>
/// Extracts the requested aggregate value from an <see cref="AnalogSummaryQueryResult"/> by column name.
/// </summary>
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
{
switch (column)
{
case "Average": return result.Average;
case "Minimum": return result.Minimum;
case "Maximum": return result.Maximum;
case "ValueCount": return result.ValueCount;
case "First": return result.First;
case "Last": return result.Last;
case "StdDev": return result.StdDev;
default: return null;
}
}
/// <summary>
/// Closes the Historian SDK connection and releases resources.
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
try
{
_connection?.CloseConnection(out _);
_connection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK connection");
}
try
{
_eventConnection?.CloseConnection(out _);
_eventConnection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK event connection");
}
_connection = null;
_eventConnection = null;
}
}
}

View File

@@ -0,0 +1,81 @@
using System;
using System.Threading;
using ArchestrA;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva
{
/// <summary>
/// Creates and opens Historian SDK connections. Extracted so tests can inject
/// fakes that control connection success, failure, and timeout behavior.
/// </summary>
internal interface IHistorianConnectionFactory
{
/// <summary>
/// Creates a new Historian SDK connection, opens it, and waits until it is ready.
/// Throws on connection failure or timeout.
/// </summary>
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
}
/// <summary>
/// Production implementation that creates real Historian SDK connections.
/// </summary>
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
{
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
{
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = config.ServerName,
TcpPort = (ushort)config.Port,
IntegratedSecurity = config.IntegratedSecurity,
UseArchestrAUser = config.IntegratedSecurity,
ConnectionType = type,
ReadOnly = true,
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
};
if (!config.IntegratedSecurity)
{
args.UserName = config.UserName ?? string.Empty;
args.Password = config.Password ?? string.Empty;
}
if (!conn.OpenConnection(args, out var error))
{
conn.Dispose();
throw new InvalidOperationException(
$"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
}
// The SDK connects asynchronously — poll until the connection is ready
var timeoutMs = config.CommandTimeoutSeconds * 1000;
var elapsed = 0;
while (elapsed < timeoutMs)
{
var status = new HistorianConnectionStatus();
conn.GetConnectionStatus(ref status);
if (status.ConnectedToServer)
return conn;
if (status.ErrorOccurred)
{
conn.Dispose();
throw new InvalidOperationException(
$"Historian SDK connection failed: {status.Error}");
}
Thread.Sleep(250);
elapsed += 250;
}
conn.Dispose();
throw new TimeoutException(
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
}
}
}

View File

@@ -0,0 +1,87 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<PlatformTarget>x86</PlatformTarget>
<LangVersion>9.0</LangVersion>
<Nullable>enable</Nullable>
<RootNamespace>ZB.MOM.WW.LmxOpcUa.Historian.Aveva</RootNamespace>
<AssemblyName>ZB.MOM.WW.LmxOpcUa.Historian.Aveva</AssemblyName>
<!-- Plugin is loaded at runtime via Assembly.LoadFrom; never copy it as a CopyLocal dep. -->
<CopyLocalLockFileAssemblies>false</CopyLocalLockFileAssemblies>
<!-- Deploy next to Host.exe under bin/<cfg>/Historian/ so F5 works without a manual copy. -->
<HistorianPluginOutputDir>$(MSBuildThisFileDirectory)..\ZB.MOM.WW.LmxOpcUa.Host\bin\$(Configuration)\net48\Historian\</HistorianPluginOutputDir>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests"/>
</ItemGroup>
<ItemGroup>
<!-- Logging -->
<PackageReference Include="Serilog" Version="2.10.0"/>
<!-- OPC UA (for DataValue/StatusCodes used by the IHistorianDataSource surface) -->
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.5.374.126"/>
</ItemGroup>
<ItemGroup>
<!-- Private=false: the plugin binds to Host types at compile time but Host.exe must not be
copied into the plugin's output folder (it is already in the process). -->
<ProjectReference Include="..\ZB.MOM.WW.LmxOpcUa.Host\ZB.MOM.WW.LmxOpcUa.Host.csproj">
<Private>false</Private>
<ReferenceOutputAssembly>true</ReferenceOutputAssembly>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<!-- Wonderware Historian SDK -->
<Reference Include="aahClientManaged">
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
<Reference Include="aahClientCommon">
<HintPath>..\..\lib\aahClientCommon.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
</ItemGroup>
<ItemGroup>
<!-- Historian SDK native dependencies — copied beside the plugin DLL so the AssemblyResolve
handler in HistorianPluginLoader can find them when the plugin first JITs. -->
<None Include="..\..\lib\aahClient.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\aahClientCommon.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\aahClientManaged.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.CBE.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.DPAPI.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<Target Name="StageHistorianPluginForHost" AfterTargets="Build">
<ItemGroup>
<_HistorianStageFiles Include="$(OutDir)aahClient.dll"/>
<_HistorianStageFiles Include="$(OutDir)aahClientCommon.dll"/>
<_HistorianStageFiles Include="$(OutDir)aahClientManaged.dll"/>
<_HistorianStageFiles Include="$(OutDir)Historian.CBE.dll"/>
<_HistorianStageFiles Include="$(OutDir)Historian.DPAPI.dll"/>
<_HistorianStageFiles Include="$(OutDir)ArchestrA.CloudHistorian.Contract.dll"/>
<_HistorianStageFiles Include="$(OutDir)$(AssemblyName).dll"/>
<_HistorianStageFiles Include="$(OutDir)$(AssemblyName).pdb" Condition="Exists('$(OutDir)$(AssemblyName).pdb')"/>
</ItemGroup>
<MakeDir Directories="$(HistorianPluginOutputDir)"/>
<Copy SourceFiles="@(_HistorianStageFiles)" DestinationFolder="$(HistorianPluginOutputDir)" SkipUnchangedFiles="true"/>
</Target>
</Project>