Migrate historian from SQL to aahClientManaged SDK and resolve all OPC UA Part 11 gaps

Replace direct SQL queries against Historian Runtime database with the Wonderware
Historian managed SDK (ArchestrA.HistorianAccess). Add HistoryServerCapabilities node,
AggregateFunctions folder, continuation points, ReadAtTime interpolation, ReturnBounds,
ReadModified rejection, HistoricalDataConfiguration per node, historical event access,
and client-side StandardDeviation aggregate support. Remove screenshot tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-06 16:38:00 -04:00
parent 5c89a44255
commit 41f0e9ec4c
35 changed files with 1858 additions and 536 deletions

View File

@@ -45,7 +45,7 @@ public class HistoryReadCommand : CommandBase
/// <summary>
/// Gets the optional aggregate name used when the operator wants processed history instead of raw samples.
/// </summary>
[CommandOption("aggregate", Description = "Aggregate function: Average, Minimum, Maximum, Count, Start, End")]
[CommandOption("aggregate", Description = "Aggregate function: Average, Minimum, Maximum, Count, Start, End, StandardDeviation")]
public string? Aggregate { get; init; }
/// <summary>
@@ -127,8 +127,9 @@ public class HistoryReadCommand : CommandBase
"count" => AggregateType.Count,
"start" or "first" => AggregateType.Start,
"end" or "last" => AggregateType.End,
"standarddeviation" or "stddev" or "stdev" => AggregateType.StandardDeviation,
_ => throw new ArgumentException(
$"Unknown aggregate: '{name}'. Supported: Average, Minimum, Maximum, Count, Start, End")
$"Unknown aggregate: '{name}'. Supported: Average, Minimum, Maximum, Count, Start, End, StandardDeviation")
};
}
}

View File

@@ -21,6 +21,7 @@ public static class AggregateTypeMapper
AggregateType.Count => ObjectIds.AggregateFunction_Count,
AggregateType.Start => ObjectIds.AggregateFunction_Start,
AggregateType.End => ObjectIds.AggregateFunction_End,
AggregateType.StandardDeviation => ObjectIds.AggregateFunction_StandardDeviationPopulation,
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, "Unknown AggregateType value.")
};
}

View File

@@ -21,5 +21,8 @@ public enum AggregateType
Start,
/// <summary>Last value in the interval.</summary>
End
End,
/// <summary>Population standard deviation of values in the interval.</summary>
StandardDeviation
}

View File

@@ -49,7 +49,8 @@ public partial class HistoryViewModel : ObservableObject
AggregateType.Maximum,
AggregateType.Count,
AggregateType.Start,
AggregateType.End
AggregateType.End,
AggregateType.StandardDeviation
];
public bool IsAggregateRead => SelectedAggregateType != null;

View File

@@ -89,10 +89,9 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
string.Join(", ", config.Security.Profiles), config.Security.AutoAcceptClientCertificates,
config.Security.RejectSHA1Certificates, config.Security.MinimumCertificateKeySize);
if (config.Security.PkiRootPath != null)
Log.Information("Security.PkiRootPath={PkiRootPath}", config.Security.PkiRootPath);
if (config.Security.CertificateSubject != null)
Log.Information("Security.CertificateSubject={CertificateSubject}", config.Security.CertificateSubject);
Log.Information("Security.PkiRootPath={PkiRootPath}", config.Security.PkiRootPath ?? "(default)");
Log.Information("Security.CertificateSubject={CertificateSubject}", config.Security.CertificateSubject ?? "(default)");
Log.Information("Security.CertificateLifetimeMonths={Months}", config.Security.CertificateLifetimeMonths);
var unknownProfiles = config.Security.Profiles
.Where(p => !SecurityProfileResolver.ValidProfileNames.Contains(p, StringComparer.OrdinalIgnoreCase))
@@ -115,6 +114,37 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
config.Security.Profiles[0].Equals("None", StringComparison.OrdinalIgnoreCase))
Log.Warning("Only the 'None' security profile is configured — transport security is disabled");
// Historian
Log.Information("Historian.Enabled={Enabled}, ServerName={ServerName}, IntegratedSecurity={IntegratedSecurity}, Port={Port}",
config.Historian.Enabled, config.Historian.ServerName, config.Historian.IntegratedSecurity,
config.Historian.Port);
Log.Information("Historian.CommandTimeoutSeconds={Timeout}, MaxValuesPerRead={MaxValues}",
config.Historian.CommandTimeoutSeconds, config.Historian.MaxValuesPerRead);
if (config.Historian.Enabled)
{
if (string.IsNullOrWhiteSpace(config.Historian.ServerName))
{
Log.Error("Historian.ServerName must not be empty when Historian is enabled");
valid = false;
}
if (config.Historian.Port < 1 || config.Historian.Port > 65535)
{
Log.Error("Historian.Port must be between 1 and 65535");
valid = false;
}
if (!config.Historian.IntegratedSecurity && string.IsNullOrWhiteSpace(config.Historian.UserName))
{
Log.Error("Historian.UserName must not be empty when IntegratedSecurity is disabled");
valid = false;
}
if (!config.Historian.IntegratedSecurity && string.IsNullOrWhiteSpace(config.Historian.Password))
Log.Warning("Historian.Password is empty — authentication may fail");
}
// Authentication
Log.Information("Authentication.AllowAnonymous={AllowAnonymous}, AnonymousCanWrite={AnonymousCanWrite}",
config.Authentication.AllowAnonymous, config.Authentication.AnonymousCanWrite);

View File

@@ -1,7 +1,7 @@
namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
{
/// <summary>
/// Wonderware Historian database configuration for OPC UA historical data access.
/// Wonderware Historian SDK configuration for OPC UA historical data access.
/// </summary>
public class HistorianConfiguration
{
@@ -11,12 +11,33 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
public bool Enabled { get; set; } = false;
/// <summary>
/// Gets or sets the connection string for the Wonderware Historian Runtime database.
/// Gets or sets the Historian server hostname.
/// </summary>
public string ConnectionString { get; set; } = "Server=localhost;Database=Runtime;Integrated Security=true;";
public string ServerName { get; set; } = "localhost";
/// <summary>
/// Gets or sets the SQL command timeout in seconds for historian queries.
/// Gets or sets a value indicating whether Windows Integrated Security is used.
/// When false, <see cref="UserName"/> and <see cref="Password"/> are used instead.
/// </summary>
public bool IntegratedSecurity { get; set; } = true;
/// <summary>
/// Gets or sets the username for Historian authentication when <see cref="IntegratedSecurity"/> is false.
/// </summary>
public string? UserName { get; set; }
/// <summary>
/// Gets or sets the password for Historian authentication when <see cref="IntegratedSecurity"/> is false.
/// </summary>
public string? Password { get; set; }
/// <summary>
/// Gets or sets the Historian server TCP port.
/// </summary>
public int Port { get; set; } = 32568;
/// <summary>
/// Gets or sets the packet timeout in seconds for Historian SDK operations.
/// </summary>
public int CommandTimeoutSeconds { get; set; } = 30;
@@ -25,4 +46,4 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
/// </summary>
public int MaxValuesPerRead { get; set; } = 10000;
}
}
}

View File

@@ -0,0 +1,13 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<Costura>
<ExcludeAssemblies>
ArchestrA.MxAccess
aahClientManaged
aahClientCommon
aahClient
Historian.CBE
Historian.DPAPI
ArchestrA.CloudHistorian.Contract
</ExcludeAssemblies>
</Costura>
</Weavers>

View File

@@ -0,0 +1,176 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="Costura" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:all>
<xs:element minOccurs="0" maxOccurs="1" name="ExcludeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of assembly names to exclude from the default action of "embed all Copy Local references", delimited with line breaks</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="IncludeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of assembly names to include from the default action of "embed all Copy Local references", delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="ExcludeRuntimeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of runtime assembly names to exclude from the default action of "embed all Copy Local references", delimited with line breaks</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="IncludeRuntimeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of runtime assembly names to include from the default action of "embed all Copy Local references", delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="Unmanaged32Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>Obsolete, use UnmanagedWinX86Assemblies instead</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="UnmanagedWinX86Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged X86 (32 bit) assembly names to include, delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="Unmanaged64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>Obsolete, use UnmanagedWinX64Assemblies instead.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="UnmanagedWinX64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged X64 (64 bit) assembly names to include, delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="UnmanagedWinArm64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged Arm64 (64 bit) assembly names to include, delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element minOccurs="0" maxOccurs="1" name="PreloadOrder" type="xs:string">
<xs:annotation>
<xs:documentation>The order of preloaded assemblies, delimited with line breaks.</xs:documentation>
</xs:annotation>
</xs:element>
</xs:all>
<xs:attribute name="CreateTemporaryAssemblies" type="xs:boolean">
<xs:annotation>
<xs:documentation>This will copy embedded files to disk before loading them into memory. This is helpful for some scenarios that expected an assembly to be loaded from a physical file.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="IncludeDebugSymbols" type="xs:boolean">
<xs:annotation>
<xs:documentation>Controls if .pdbs for reference assemblies are also embedded.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="IncludeRuntimeReferences" type="xs:boolean">
<xs:annotation>
<xs:documentation>Controls if runtime assemblies are also embedded.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="UseRuntimeReferencePaths" type="xs:boolean">
<xs:annotation>
<xs:documentation>Controls whether the runtime assemblies are embedded with their full path or only with their assembly name.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="DisableCompression" type="xs:boolean">
<xs:annotation>
<xs:documentation>Embedded assemblies are compressed by default, and uncompressed when they are loaded. You can turn compression off with this option.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="DisableCleanup" type="xs:boolean">
<xs:annotation>
<xs:documentation>As part of Costura, embedded assemblies are no longer included as part of the build. This cleanup can be turned off.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="DisableEventSubscription" type="xs:boolean">
<xs:annotation>
<xs:documentation>The attach method no longer subscribes to the `AppDomain.AssemblyResolve` (.NET 4.x) and `AssemblyLoadContext.Resolving` (.NET 6.0+) events.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="LoadAtModuleInit" type="xs:boolean">
<xs:annotation>
<xs:documentation>Costura by default will load as part of the module initialization. This flag disables that behavior. Make sure you call CosturaUtility.Initialize() somewhere in your code.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="IgnoreSatelliteAssemblies" type="xs:boolean">
<xs:annotation>
<xs:documentation>Costura will by default use assemblies with a name like 'resources.dll' as a satellite resource and prepend the output path. This flag disables that behavior.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="ExcludeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of assembly names to exclude from the default action of "embed all Copy Local references", delimited with |</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="IncludeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of assembly names to include from the default action of "embed all Copy Local references", delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="ExcludeRuntimeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of runtime assembly names to exclude from the default action of "embed all Copy Local references", delimited with |</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="IncludeRuntimeAssemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of runtime assembly names to include from the default action of "embed all Copy Local references", delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="Unmanaged32Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>Obsolete, use UnmanagedWinX86Assemblies instead</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="UnmanagedWinX86Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged X86 (32 bit) assembly names to include, delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="Unmanaged64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>Obsolete, use UnmanagedWinX64Assemblies instead</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="UnmanagedWinX64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged X64 (64 bit) assembly names to include, delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="UnmanagedWinArm64Assemblies" type="xs:string">
<xs:annotation>
<xs:documentation>A list of unmanaged Arm64 (64 bit) assembly names to include, delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="PreloadOrder" type="xs:string">
<xs:annotation>
<xs:documentation>The order of preloaded assemblies, delimited with |.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

View File

@@ -1,8 +1,9 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using StringCollection = System.Collections.Specialized.StringCollection;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Opc.Ua;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
@@ -11,23 +12,151 @@ using ZB.MOM.WW.LmxOpcUa.Host.Domain;
namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
{
/// <summary>
/// Reads historical data from the Wonderware Historian Runtime database.
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
/// </summary>
public class HistorianDataSource
public class HistorianDataSource : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
private readonly HistorianConfiguration _config;
private readonly object _connectionLock = new object();
private readonly object _eventConnectionLock = new object();
private HistorianAccess? _connection;
private HistorianAccess? _eventConnection;
private bool _disposed;
/// <summary>
/// Initializes a Historian reader that translates OPC UA history requests into Wonderware Historian queries.
/// Initializes a Historian reader that translates OPC UA history requests into Wonderware Historian SDK queries.
/// </summary>
/// <param name="config">The Historian connection settings and command timeout used for runtime history lookups.</param>
/// <param name="config">The Historian SDK connection settings used for runtime history lookups.</param>
public HistorianDataSource(HistorianConfiguration config)
{
_config = config;
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
lock (_connectionLock)
{
if (_connection != null)
return;
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = _config.ServerName,
TcpPort = (ushort)_config.Port,
IntegratedSecurity = _config.IntegratedSecurity,
ConnectionType = HistorianConnectionType.Process,
ReadOnly = true,
PacketTimeout = (uint)(_config.CommandTimeoutSeconds * 1000)
};
if (!_config.IntegratedSecurity)
{
args.UserName = _config.UserName ?? string.Empty;
args.Password = _config.Password ?? string.Empty;
}
if (!conn.OpenConnection(args, out var error))
{
conn.Dispose();
throw new InvalidOperationException(
$"Failed to open Historian SDK connection to {_config.ServerName}:{_config.Port}: {error.ErrorCode}");
}
_connection = conn;
Log.Information("Historian SDK connection opened to {Server}:{Port}", _config.ServerName, _config.Port);
}
}
private void HandleConnectionError(Exception? ex = null)
{
lock (_connectionLock)
{
if (_connection == null)
return;
try
{
_connection.CloseConnection(out _);
_connection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
}
_connection = null;
Log.Warning(ex, "Historian SDK connection reset — will reconnect on next request");
}
}
private void EnsureEventConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
lock (_eventConnectionLock)
{
if (_eventConnection != null)
return;
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = _config.ServerName,
TcpPort = (ushort)_config.Port,
IntegratedSecurity = _config.IntegratedSecurity,
ConnectionType = HistorianConnectionType.Event,
ReadOnly = true,
PacketTimeout = (uint)(_config.CommandTimeoutSeconds * 1000)
};
if (!_config.IntegratedSecurity)
{
args.UserName = _config.UserName ?? string.Empty;
args.Password = _config.Password ?? string.Empty;
}
if (!conn.OpenConnection(args, out var error))
{
conn.Dispose();
throw new InvalidOperationException(
$"Failed to open Historian SDK event connection to {_config.ServerName}:{_config.Port}: {error.ErrorCode}");
}
_eventConnection = conn;
Log.Information("Historian SDK event connection opened to {Server}:{Port}",
_config.ServerName, _config.Port);
}
}
private void HandleEventConnectionError(Exception? ex = null)
{
lock (_eventConnectionLock)
{
if (_eventConnection == null)
return;
try
{
_eventConnection.CloseConnection(out _);
_eventConnection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
}
_eventConnection = null;
Log.Warning(ex, "Historian SDK event connection reset — will reconnect on next request");
}
}
/// <summary>
/// Reads raw historical values for a tag from the Historian.
/// </summary>
@@ -35,52 +164,89 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
/// <param name="startTime">The inclusive start of the client-requested history window.</param>
/// <param name="endTime">The inclusive end of the client-requested history window.</param>
/// <param name="maxValues">The maximum number of samples to return when the OPC UA client limits the result set.</param>
/// <param name="ct">The cancellation token that aborts the database call when the OPC UA request is cancelled.</param>
public async Task<List<DataValue>> ReadRawAsync(
/// <param name="ct">The cancellation token that aborts the query when the OPC UA request is cancelled.</param>
public Task<List<DataValue>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List<DataValue>();
var sql = maxValues > 0
? "SELECT TOP (@MaxValues) DateTime, Value, vValue, OPCQuality FROM Runtime.dbo.History WHERE TagName = @TagName AND wwTimezone='UTC' AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime"
: "SELECT DateTime, Value, vValue, OPCQuality FROM Runtime.dbo.History WHERE TagName = @TagName AND wwTimezone='UTC' AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime";
using var conn = new SqlConnection(_config.ConnectionString);
await conn.OpenAsync(ct);
using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds };
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartTime", startTime);
cmd.Parameters.AddWithValue("@EndTime", endTime);
if (maxValues > 0)
cmd.Parameters.AddWithValue("@MaxValues", maxValues);
using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
try
{
var timestamp = DateTime.SpecifyKind(reader.GetDateTime(0), DateTimeKind.Utc);
object? value;
if (!reader.IsDBNull(1))
value = reader.GetDouble(1);
else if (!reader.IsDBNull(2))
value = reader.GetString(2);
else
value = null;
var quality = reader.IsDBNull(3) ? (byte)0 : Convert.ToByte(reader.GetValue(3));
EnsureConnected();
results.Add(new DataValue
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = QualityMapper.MapToOpcUaStatusCode(QualityMapper.MapFromMxAccessQuality(quality))
});
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
RetrievalMode = HistorianRetrievalMode.Full
};
if (maxValues > 0)
args.BatchSize = (uint)maxValues;
else if (_config.MaxValuesPerRead > 0)
args.BatchSize = (uint)_config.MaxValuesPerRead;
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
HandleConnectionError();
return Task.FromResult(results);
}
var count = 0;
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
var quality = (byte)(result.OpcQuality & 0xFF);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = QualityMapper.MapToOpcUaStatusCode(QualityMapper.MapFromMxAccessQuality(quality))
});
count++;
if (limit > 0 && count >= limit)
break;
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return results;
return Task.FromResult(results);
}
/// <summary>
@@ -92,46 +258,260 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
/// <param name="intervalMs">The Wonderware summary resolution, in milliseconds, used to bucket aggregate values.</param>
/// <param name="aggregateColumn">The Historian summary column that matches the OPC UA aggregate function being requested.</param>
/// <param name="ct">The cancellation token that aborts the aggregate query when the client request is cancelled.</param>
public async Task<List<DataValue>> ReadAggregateAsync(
public Task<List<DataValue>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List<DataValue>();
var sql = $"SELECT StartDateTime, [{aggregateColumn}] FROM Runtime.dbo.AnalogSummaryHistory " +
"WHERE TagName = @TagName AND wwTimezone='UTC' AND StartDateTime >= @StartTime AND StartDateTime <= @EndTime " +
"AND wwResolution = @Resolution ORDER BY StartDateTime";
using var conn = new SqlConnection(_config.ConnectionString);
await conn.OpenAsync(ct);
using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds };
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartTime", startTime);
cmd.Parameters.AddWithValue("@EndTime", endTime);
cmd.Parameters.AddWithValue("@Resolution", (int)intervalMs);
using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
try
{
var timestamp = DateTime.SpecifyKind(reader.GetDateTime(0), DateTimeKind.Utc);
var value = reader.IsDBNull(1) ? (object?)null : reader.GetDouble(1);
EnsureConnected();
results.Add(new DataValue
using var query = _connection!.CreateAnalogSummaryQuery();
var args = new AnalogSummaryQueryArgs
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = value != null ? StatusCodes.Good : StatusCodes.BadNoData
});
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
Resolution = (ulong)intervalMs
};
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName,
error.ErrorCode);
HandleConnectionError();
return Task.FromResult(results);
}
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
var value = ExtractAggregateValue(result, aggregateColumn);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = value != null ? StatusCodes.Good : StatusCodes.BadNoData
});
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return results;
return Task.FromResult(results);
}
/// <summary>
/// Reads interpolated values for a tag at specific timestamps from the Historian.
/// </summary>
/// <param name="tagName">The Wonderware tag name backing the OPC UA node.</param>
/// <param name="timestamps">The specific timestamps at which interpolated values are requested.</param>
/// <param name="ct">The cancellation token.</param>
public Task<List<DataValue>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default)
{
var results = new List<DataValue>();
if (timestamps == null || timestamps.Length == 0)
return Task.FromResult(results);
try
{
EnsureConnected();
foreach (var timestamp in timestamps)
{
ct.ThrowIfCancellationRequested();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = timestamp,
EndDateTime = timestamp,
RetrievalMode = HistorianRetrievalMode.Interpolated,
BatchSize = 1
};
if (!query.StartQuery(args, out var error))
{
results.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = StatusCodes.BadNoData
});
continue;
}
if (query.MoveNext(out error))
{
var result = query.QueryResult;
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
var quality = (byte)(result.OpcQuality & 0xFF);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = QualityMapper.MapToOpcUaStatusCode(
QualityMapper.MapFromMxAccessQuality(quality))
});
}
else
{
results.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = StatusCodes.BadNoData
});
}
query.EndQuery(out _);
}
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
HandleConnectionError(ex);
}
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
tagName, results.Count, timestamps.Length);
return Task.FromResult(results);
}
/// <summary>
/// Reads historical alarm/event records from the Historian event store.
/// </summary>
/// <param name="sourceName">Optional source name filter. Null returns all events.</param>
/// <param name="startTime">The inclusive start of the event history window.</param>
/// <param name="endTime">The inclusive end of the event history window.</param>
/// <param name="maxEvents">The maximum number of events to return.</param>
/// <param name="ct">The cancellation token.</param>
public Task<List<HistorianEvent>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default)
{
var results = new List<HistorianEvent>();
try
{
EnsureEventConnected();
using var query = _eventConnection!.CreateEventQuery();
var args = new EventQueryArgs
{
StartDateTime = startTime,
EndDateTime = endTime,
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
QueryType = HistorianEventQueryType.Events,
EventOrder = HistorianEventOrder.Ascending
};
if (!string.IsNullOrEmpty(sourceName))
{
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
}
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
HandleEventConnectionError();
return Task.FromResult(results);
}
var count = 0;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
results.Add(query.QueryResult);
count++;
if (maxEvents > 0 && count >= maxEvents)
break;
}
query.EndQuery(out _);
}
catch (OperationCanceledException)
{
throw;
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
HandleEventConnectionError(ex);
}
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
sourceName ?? "(all)", results.Count, startTime, endTime);
return Task.FromResult(results);
}
/// <summary>
/// Extracts the requested aggregate value from an <see cref="AnalogSummaryQueryResult"/> by column name.
/// </summary>
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
{
switch (column)
{
case "Average": return result.Average;
case "Minimum": return result.Minimum;
case "Maximum": return result.Maximum;
case "ValueCount": return result.ValueCount;
case "First": return result.First;
case "Last": return result.Last;
case "StdDev": return result.StdDev;
default: return null;
}
}
/// <summary>
/// Maps an OPC UA aggregate NodeId to the corresponding Historian column name.
@@ -156,5 +536,38 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
return "StdDev";
return null;
}
/// <summary>
/// Closes the Historian SDK connection and releases resources.
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
try
{
_connection?.CloseConnection(out _);
_connection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK connection");
}
try
{
_eventConnection?.CloseConnection(out _);
_eventConnection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK event connection");
}
_connection = null;
_eventConnection = null;
}
}
}
}

View File

@@ -0,0 +1,88 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Opc.Ua;
using Serilog;
namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
{
/// <summary>
/// Manages continuation points for OPC UA HistoryRead requests that return
/// more data than the per-request limit allows.
/// </summary>
internal sealed class HistoryContinuationPointManager
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistoryContinuationPointManager>();
private readonly ConcurrentDictionary<Guid, StoredContinuation> _store = new();
private readonly TimeSpan _timeout = TimeSpan.FromMinutes(5);
/// <summary>
/// Stores remaining data values and returns a continuation point identifier.
/// </summary>
public byte[] Store(List<DataValue> remaining)
{
PurgeExpired();
var id = Guid.NewGuid();
_store[id] = new StoredContinuation(remaining, DateTime.UtcNow);
Log.Debug("Stored history continuation point {Id} with {Count} remaining values", id, remaining.Count);
return id.ToByteArray();
}
/// <summary>
/// Retrieves and removes the remaining data values for a continuation point.
/// Returns null if the continuation point is invalid or expired.
/// </summary>
public List<DataValue>? Retrieve(byte[] continuationPoint)
{
if (continuationPoint == null || continuationPoint.Length != 16)
return null;
var id = new Guid(continuationPoint);
if (!_store.TryRemove(id, out var stored))
return null;
if (DateTime.UtcNow - stored.CreatedAt > _timeout)
{
Log.Debug("History continuation point {Id} expired", id);
return null;
}
return stored.Values;
}
/// <summary>
/// Releases a continuation point without retrieving its data.
/// </summary>
public void Release(byte[] continuationPoint)
{
if (continuationPoint == null || continuationPoint.Length != 16)
return;
var id = new Guid(continuationPoint);
_store.TryRemove(id, out _);
}
private void PurgeExpired()
{
var cutoff = DateTime.UtcNow - _timeout;
foreach (var kvp in _store)
{
if (kvp.Value.CreatedAt < cutoff)
_store.TryRemove(kvp.Key, out _);
}
}
private sealed class StoredContinuation
{
public StoredContinuation(List<DataValue> values, DateTime createdAt)
{
Values = values;
CreatedAt = createdAt;
}
public List<DataValue> Values { get; }
public DateTime CreatedAt { get; }
}
}
}

View File

@@ -28,6 +28,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly bool _anonymousCanWrite;
private readonly AutoResetEvent _dataChangeSignal = new(false);
private readonly Dictionary<int, List<string>> _gobjectToTagRefs = new();
private readonly HistoryContinuationPointManager _historyContinuations = new();
private readonly HistorianDataSource? _historianDataSource;
private readonly PerformanceMetrics _metrics;
@@ -896,6 +897,44 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
variable.AccessLevel = accessLevel;
variable.UserAccessLevel = accessLevel;
variable.Historizing = attr.IsHistorized;
if (attr.IsHistorized)
{
var histConfigNodeId = new NodeId(nodeIdString + ".HAConfiguration", NamespaceIndex);
var histConfig = new BaseObjectState(variable)
{
NodeId = histConfigNodeId,
BrowseName = new QualifiedName("HAConfiguration", NamespaceIndex),
DisplayName = "HA Configuration",
TypeDefinitionId = ObjectTypeIds.HistoricalDataConfigurationType
};
var steppedProp = new PropertyState<bool>(histConfig)
{
NodeId = new NodeId(nodeIdString + ".HAConfiguration.Stepped", NamespaceIndex),
BrowseName = BrowseNames.Stepped,
DisplayName = "Stepped",
Value = false,
AccessLevel = AccessLevels.CurrentRead,
UserAccessLevel = AccessLevels.CurrentRead
};
histConfig.AddChild(steppedProp);
var definitionProp = new PropertyState<string>(histConfig)
{
NodeId = new NodeId(nodeIdString + ".HAConfiguration.Definition", NamespaceIndex),
BrowseName = BrowseNames.Definition,
DisplayName = "Definition",
Value = "Wonderware Historian",
AccessLevel = AccessLevels.CurrentRead,
UserAccessLevel = AccessLevels.CurrentRead
};
histConfig.AddChild(definitionProp);
variable.AddChild(histConfig);
AddPredefinedNode(SystemContext, histConfig);
}
variable.Value = NormalizePublishedValue(attr.FullTagReference, null);
variable.StatusCode = StatusCodes.BadWaitingForInitialData;
variable.Timestamp = DateTime.UtcNow;
@@ -1390,6 +1429,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
// Handle continuation point resumption
if (nodesToRead[idx].ContinuationPoint != null && nodesToRead[idx].ContinuationPoint.Length > 0)
{
var remaining = _historyContinuations.Retrieve(nodesToRead[idx].ContinuationPoint);
if (remaining == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadContinuationPointInvalid);
continue;
}
ReturnHistoryPage(remaining, details.NumValuesPerNode, results, errors, idx);
continue;
}
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
@@ -1403,6 +1457,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
continue;
}
if (details.IsReadModified)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
try
{
var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
@@ -1410,15 +1470,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
tagRef, details.StartTime, details.EndTime, maxValues)
.GetAwaiter().GetResult();
var historyData = new HistoryData();
historyData.DataValues.AddRange(dataValues);
if (details.ReturnBounds)
AddBoundingValues(dataValues, details.StartTime, details.EndTime);
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData)
};
errors[idx] = ServiceResult.Good;
ReturnHistoryPage(dataValues, details.NumValuesPerNode, results, errors, idx);
}
catch (Exception ex)
{
@@ -1442,6 +1497,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
// Handle continuation point resumption
if (nodesToRead[idx].ContinuationPoint != null && nodesToRead[idx].ContinuationPoint.Length > 0)
{
var remaining = _historyContinuations.Retrieve(nodesToRead[idx].ContinuationPoint);
if (remaining == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadContinuationPointInvalid);
continue;
}
ReturnHistoryPage(remaining, 0, results, errors, idx);
continue;
}
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
@@ -1476,6 +1546,58 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
details.ProcessingInterval, column)
.GetAwaiter().GetResult();
ReturnHistoryPage(dataValues, 0, results, errors, idx);
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadAtTime(
ServerSystemContext context,
ReadAtTimeDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
if (details.ReqTimes == null || details.ReqTimes.Count == 0)
{
errors[idx] = new ServiceResult(StatusCodes.BadInvalidArgument);
continue;
}
try
{
var timestamps = new DateTime[details.ReqTimes.Count];
for (var i = 0; i < details.ReqTimes.Count; i++)
timestamps[i] = details.ReqTimes[i];
var dataValues = _historianDataSource.ReadAtTimeAsync(tagRef, timestamps)
.GetAwaiter().GetResult();
var historyData = new HistoryData();
historyData.DataValues.AddRange(dataValues);
@@ -1488,12 +1610,149 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef);
Log.Warning(ex, "HistoryRead at-time failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadEvents(
ServerSystemContext context,
ReadEventDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
// Resolve the source name for event filtering.
// Alarm condition nodes end with ".Condition" — strip to get the source tag.
// Area/object nodes filter by Source_Name matching the browse name.
string? sourceName = null;
if (nodeIdStr != null)
{
if (nodeIdStr.EndsWith(".Condition"))
{
var baseTag = nodeIdStr.Substring(0, nodeIdStr.Length - ".Condition".Length);
sourceName = baseTag;
}
else if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
sourceName = tagRef;
}
}
try
{
var maxEvents = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
var events = _historianDataSource.ReadEventsAsync(
sourceName, details.StartTime, details.EndTime, maxEvents)
.GetAwaiter().GetResult();
var historyEvent = new HistoryEvent();
foreach (var evt in events)
{
// Build the standard event field list per OPC UA Part 11
// Fields: EventId, EventType, SourceNode, SourceName, Time, ReceiveTime,
// Message, Severity
var fields = new HistoryEventFieldList();
fields.EventFields.Add(new Variant(evt.Id.ToByteArray()));
fields.EventFields.Add(new Variant(ObjectTypeIds.AlarmConditionType));
fields.EventFields.Add(new Variant(
nodeIdStr != null ? new NodeId(nodeIdStr, NamespaceIndex) : NodeId.Null));
fields.EventFields.Add(new Variant(evt.Source ?? ""));
fields.EventFields.Add(new Variant(
DateTime.SpecifyKind(evt.EventTime, DateTimeKind.Utc)));
fields.EventFields.Add(new Variant(
DateTime.SpecifyKind(evt.ReceivedTime, DateTimeKind.Utc)));
fields.EventFields.Add(new Variant(new LocalizedText(evt.DisplayText ?? "")));
fields.EventFields.Add(new Variant((ushort)evt.Severity));
historyEvent.Events.Add(fields);
}
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyEvent)
};
errors[idx] = ServiceResult.Good;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for {NodeId}", nodeIdStr);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
private void ReturnHistoryPage(List<DataValue> dataValues, uint numValuesPerNode,
IList<HistoryReadResult> results, IList<ServiceResult> errors, int idx)
{
var pageSize = numValuesPerNode > 0 ? (int)numValuesPerNode : dataValues.Count;
var historyData = new HistoryData();
byte[]? continuationPoint = null;
if (dataValues.Count > pageSize)
{
historyData.DataValues.AddRange(dataValues.GetRange(0, pageSize));
var remainder = dataValues.GetRange(pageSize, dataValues.Count - pageSize);
continuationPoint = _historyContinuations.Store(remainder);
}
else
{
historyData.DataValues.AddRange(dataValues);
}
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData),
ContinuationPoint = continuationPoint
};
errors[idx] = ServiceResult.Good;
}
private static void AddBoundingValues(List<DataValue> dataValues, DateTime startTime, DateTime endTime)
{
// Insert start bound if first sample doesn't match start time
if (dataValues.Count == 0 || dataValues[0].SourceTimestamp != startTime)
{
dataValues.Insert(0, new DataValue
{
Value = Variant.Null,
SourceTimestamp = startTime,
ServerTimestamp = startTime,
StatusCode = StatusCodes.BadBoundNotFound
});
}
// Append end bound if last sample doesn't match end time
if (dataValues.Count == 0 || dataValues[dataValues.Count - 1].SourceTimestamp != endTime)
{
dataValues.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = endTime,
ServerTimestamp = endTime,
StatusCode = StatusCodes.BadBoundNotFound
});
}
}
#endregion
#region Subscription Delivery

View File

@@ -109,6 +109,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
server.SessionManager.ImpersonateUser += OnImpersonateUser;
ConfigureRedundancy(server);
ConfigureHistoryCapabilities(server);
}
private void ConfigureRedundancy(IServerInternal server)
@@ -162,6 +163,118 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
}
private void ConfigureHistoryCapabilities(IServerInternal server)
{
if (_historianDataSource == null)
return;
try
{
var dnm = server.DiagnosticsNodeManager;
var ctx = server.DefaultSystemContext;
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_AccessHistoryDataCapability, true);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_AccessHistoryEventsCapability,
_alarmTrackingEnabled);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_MaxReturnDataValues,
(uint)(_historianDataSource != null ? 10000 : 0));
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_MaxReturnEventValues, (uint)0);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_InsertDataCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_ReplaceDataCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_UpdateDataCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_DeleteRawCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_DeleteAtTimeCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_InsertEventCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_ReplaceEventCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_UpdateEventCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_DeleteEventCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_InsertAnnotationCapability, false);
SetPredefinedVariable(dnm, ctx,
VariableIds.HistoryServerCapabilities_ServerTimestampSupported, true);
// Add aggregate function references under the AggregateFunctions folder
var aggFolderNode = dnm?.FindPredefinedNode(
ObjectIds.HistoryServerCapabilities_AggregateFunctions,
typeof(FolderState)) as FolderState;
if (aggFolderNode != null)
{
var aggregateIds = new[]
{
ObjectIds.AggregateFunction_Average,
ObjectIds.AggregateFunction_Minimum,
ObjectIds.AggregateFunction_Maximum,
ObjectIds.AggregateFunction_Count,
ObjectIds.AggregateFunction_Start,
ObjectIds.AggregateFunction_End,
ObjectIds.AggregateFunction_StandardDeviationPopulation
};
foreach (var aggId in aggregateIds)
{
var aggNode = dnm?.FindPredefinedNode(aggId, typeof(BaseObjectState)) as BaseObjectState;
if (aggNode != null)
{
try
{
aggFolderNode.AddReference(ReferenceTypeIds.Organizes, false, aggNode.NodeId);
}
catch (ArgumentException)
{
// Reference already exists — skip
}
try
{
aggNode.AddReference(ReferenceTypeIds.Organizes, true, aggFolderNode.NodeId);
}
catch (ArgumentException)
{
// Reference already exists — skip
}
}
}
Log.Information("HistoryServerCapabilities configured with {Count} aggregate functions",
aggregateIds.Length);
}
else
{
Log.Warning("AggregateFunctions folder not found in predefined nodes");
}
}
catch (Exception ex)
{
Log.Warning(ex,
"Failed to configure HistoryServerCapabilities — history discovery may not work for clients");
}
}
private static void SetPredefinedVariable(DiagnosticsNodeManager? dnm, ServerSystemContext ctx,
NodeId variableId, object value)
{
var node = dnm?.FindPredefinedNode(variableId, typeof(BaseVariableState)) as BaseVariableState;
if (node != null)
{
node.Value = value;
node.ClearChangeMasks(ctx, false);
}
}
/// <summary>
/// Updates the server's ServiceLevel based on current runtime health.
/// Called by the service layer when MXAccess or DB health changes.

View File

@@ -31,6 +31,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
private CancellationTokenSource? _cts;
private HealthCheckService? _healthCheck;
private HistorianDataSource? _historianDataSource;
private MxAccessClient? _mxAccessClient;
private IMxAccessClient? _mxAccessClientForWiring;
private StaComThread? _staThread;
@@ -214,7 +215,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
// Step 8: Create OPC UA server host + node manager
var effectiveMxClient = (IMxAccessClient?)_mxAccessClient ??
_mxAccessClientForWiring ?? new NullMxAccessClient();
var historianDataSource = _config.Historian.Enabled
_historianDataSource = _config.Historian.Enabled
? new HistorianDataSource(_config.Historian)
: null;
IUserAuthenticationProvider? authProvider = null;
@@ -230,7 +231,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
_config.Authentication.Ldap.BaseDN);
}
ServerHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, Metrics, historianDataSource,
ServerHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, Metrics, _historianDataSource,
_config.Authentication, authProvider, _config.Security, _config.Redundancy);
// Step 9-10: Query hierarchy, start server, build address space
@@ -329,6 +330,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
}
_staThread?.Dispose();
_historianDataSource?.Dispose();
StatusWeb?.Dispose();
Metrics?.Dispose();

View File

@@ -29,6 +29,9 @@
<!-- Configuration -->
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1"/>
<!-- Single-EXE bundling -->
<PackageReference Include="Costura.Fody" Version="6.0.0-alpha0384" PrivateAssets="all"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0"/>
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.0.1"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0"/>
@@ -45,6 +48,30 @@
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
<!-- Wonderware Historian SDK -->
<Reference Include="aahClientManaged">
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
</ItemGroup>
<ItemGroup>
<!-- Historian SDK native dependencies -->
<None Include="..\..\lib\aahClient.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\aahClientCommon.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.CBE.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.DPAPI.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>

View File

@@ -71,7 +71,11 @@
},
"Historian": {
"Enabled": false,
"ConnectionString": "Server=localhost;Database=Runtime;Integrated Security=true;",
"ServerName": "localhost",
"IntegratedSecurity": true,
"UserName": null,
"Password": null,
"Port": 32568,
"CommandTimeoutSeconds": 30,
"MaxValuesPerRead": 10000
}