From 6df1a79d35f8badccad6b1fb939f19e98fb885e8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 01:44:04 -0400 Subject: [PATCH 1/2] =?UTF-8?q?Phase=202=20PR=205=20=E2=80=94=20port=20Won?= =?UTF-8?q?derware=20Historian=20SDK=20into=20Driver.Galaxy.Host/Backend/H?= =?UTF-8?q?istorian/.=20The=20full=20v1=20Historian.Aveva=20code=20path=20?= =?UTF-8?q?(HistorianDataSource=20+=20HistorianClusterEndpointPicker=20+?= =?UTF-8?q?=20IHistorianConnectionFactory=20+=20SdkHistorianConnectionFact?= =?UTF-8?q?ory)=20now=20lives=20inside=20Galaxy.Host=20instead=20of=20the?= =?UTF-8?q?=20previously-required=20out-of-tree=20plugin=20+=20HistorianPl?= =?UTF-8?q?uginLoader=20AssemblyResolve=20hack,=20and=20MxAccessGalaxyBack?= =?UTF-8?q?end.HistoryReadAsync=20=E2=80=94=20which=20previously=20returne?= =?UTF-8?q?d=20a=20Phase=202=20Task=20B.1.h=20follow-up=20placeholder=20?= =?UTF-8?q?=E2=80=94=20now=20delegates=20to=20the=20ported=20HistorianData?= =?UTF-8?q?Source.ReadRawAsync,=20maps=20HistorianSample=20to=20GalaxyData?= =?UTF-8?q?Value=20via=20the=20IPC=20wire=20shape,=20and=20reports=20Succe?= =?UTF-8?q?ss=3Dtrue=20with=20per-tag=20HistoryTagValues=20arrays.=20OPC-U?= =?UTF-8?q?A-free=20surface=20inside=20Galaxy.Host:=20the=20v1=20code=20re?= =?UTF-8?q?turned=20Opc.Ua.DataValue=20on=20the=20hot=20path,=20which=20wo?= =?UTF-8?q?uld=20have=20required=20dragging=20OPCFoundation.NetStandard.Op?= =?UTF-8?q?c.Ua.Server=20into=20net48=20x86=20Galaxy.Host=20and=20bleeding?= =?UTF-8?q?=20OPC=20types=20across=20the=20IPC=20boundary=20=E2=80=94=20in?= =?UTF-8?q?stead,=20the=20port=20introduces=20HistorianSample=20(Value,=20?= =?UTF-8?q?Quality=20byte,=20TimestampUtc)=20+=20HistorianAggregateSample?= =?UTF-8?q?=20(Value,=20TimestampUtc)=20POCOs=20that=20carry=20the=20raw?= =?UTF-8?q?=20MX=20quality=20byte=20through=20the=20pipe=20unchanged,=20an?= =?UTF-8?q?d=20the=20OPC=20translation=20happens=20on=20the=20Proxy=20side?= =?UTF-8?q?=20via=20the=20existing=20QualityMapper=20that=20the=20live-rea?= =?UTF-8?q?d=20path=20already=20uses.=20Decision=20#13's=20IPC=20data-shap?= =?UTF-8?q?e=20contract=20survives=20intact=20=E2=80=94=20GalaxyDataValue?= =?UTF-8?q?=20(TagReference=20+=20ValueBytes=20MessagePack=20+=20ValueMess?= =?UTF-8?q?agePackType=20+=20StatusCode=20+=20SourceTimestampUtcUnixMs=20+?= =?UTF-8?q?=20ServerTimestampUtcUnixMs)=20=E2=80=94=20so=20no=20Shared.Con?= =?UTF-8?q?tracts=20wire=20break=20vs=20PR=204.=20Cluster=20failover=20pre?= =?UTF-8?q?served=20verbatim:=20HistorianClusterEndpointPicker=20is=20the?= =?UTF-8?q?=20thread-safe=20pure-logic=20picker=20ported=20verbatim=20with?= =?UTF-8?q?=20no=20SDK=20dependency=20(injected=20DateTime=20clock,=20per-?= =?UTF-8?q?node=20cooldown=20state,=20unknown-node-name=20tolerance,=20cas?= =?UTF-8?q?e-insensitive=20de-dup=20on=20configuration-order=20list),=20Co?= =?UTF-8?q?nnectToAnyHealthyNode=20iterates=20the=20picker's=20healthy=20c?= =?UTF-8?q?andidates,=20clones=20config=20per=20attempt,=20calls=20the=20f?= =?UTF-8?q?actory,=20marks=20healthy=20on=20success=20/=20failed=20on=20ex?= =?UTF-8?q?ception=20with=20the=20failure=20message=20stored=20for=20dashb?= =?UTF-8?q?oard=20surfacing,=20throws=20"All=20N=20healthy=20historian=20c?= =?UTF-8?q?andidate(s)=20failed"=20with=20the=20last=20exception=20chained?= =?UTF-8?q?=20when=20every=20node=20exhausts.=20Process=20path=20+=20Event?= =?UTF-8?q?=20path=20use=20separate=20HistorianAccess=20connections=20(Cre?= =?UTF-8?q?ateHistoryQuery=20vs=20CreateEventQuery=20vs=20CreateAnalogSumm?= =?UTF-8?q?aryQuery=20on=20the=20SDK=20surface)=20guarded=20by=20independe?= =?UTF-8?q?nt=20=5Fconnection/=5FeventConnection=20locks=20=E2=80=94=20a?= =?UTF-8?q?=20mid-query=20failure=20on=20one=20silo=20resets=20only=20that?= =?UTF-8?q?=20connection,=20the=20other=20stays=20open.=20Four=20SDK=20pat?= =?UTF-8?q?hs=20ported:=20ReadRawAsync=20(RetrievalMode.Full,=20BatchSize?= =?UTF-8?q?=20from=20config.MaxValuesPerRead,=20MoveNext=20pump,=20per-sam?= =?UTF-8?q?ple=20quality=20+=20value=20decode=20with=20the=20StringValue/V?= =?UTF-8?q?alue=20fallback=20the=20v1=20code=20did,=20limit-based=20early?= =?UTF-8?q?=20exit),=20ReadAggregateAsync=20(AnalogSummaryQuery=20+=20Reso?= =?UTF-8?q?lution=20in=20ms,=20ExtractAggregateValue=20maps=20Average/Mini?= =?UTF-8?q?mum/Maximum/ValueCount/First/Last/StdDev=20column=20names=20?= =?UTF-8?q?=E2=80=94=20the=20NodeId=20to=20column=20mapping=20is=20moved?= =?UTF-8?q?=20to=20the=20Proxy=20side=20since=20the=20IPC=20request=20carr?= =?UTF-8?q?ies=20a=20string=20column),=20ReadAtTimeAsync=20(per-timestamp?= =?UTF-8?q?=20HistoryQuery=20with=20RetrievalMode.Interpolated=20+=20Batch?= =?UTF-8?q?Size=3D1,=20returns=20Quality=3D0=20/=20Value=3Dnull=20for=20mi?= =?UTF-8?q?ssing=20samples),=20ReadEventsAsync=20(EventQuery=20+=20AddEven?= =?UTF-8?q?tFilter("Source",Equal,sourceName)=20when=20sourceName=20is=20n?= =?UTF-8?q?on-null,=20EventOrder.Ascending,=20EventCount=20=3D=20maxEvents?= =?UTF-8?q?=20or=20config.MaxValuesPerRead);=20GetHealthSnapshot=20returns?= =?UTF-8?q?=20the=20full=20runtime-health=20snapshot=20(TotalQueries/Succe?= =?UTF-8?q?sses/Failures=20+=20ConsecutiveFailures=20+=20LastSuccess/Failu?= =?UTF-8?q?reTime=20+=20LastError=20+=20ProcessConnectionOpen/EventConnect?= =?UTF-8?q?ionOpen=20+=20ActiveProcessNode/ActiveEventNode=20+=20per-node?= =?UTF-8?q?=20state=20list).=20ReadRaw=20is=20the=20only=20path=20wired=20?= =?UTF-8?q?through=20IPC=20in=20PR=205=20(HistoryReadRequest/HistoryTagVal?= =?UTF-8?q?ues/HistoryReadResponse=20already=20existed=20in=20Shared.Contr?= =?UTF-8?q?acts);=20Aggregate/AtTime/Events/Health=20are=20ported-but-not-?= =?UTF-8?q?yet-IPC-exposed=20=E2=80=94=20they=20stay=20internal=20to=20Gal?= =?UTF-8?q?axy.Host=20for=20PR=206+=20to=20surface=20via=20new=20contract?= =?UTF-8?q?=20message=20kinds=20(aggregate=20=3D=20OPC=20UA=20HistoryReadP?= =?UTF-8?q?rocessed,=20at-time=20=3D=20HistoryReadAtTime,=20events=20=3D?= =?UTF-8?q?=20HistoryReadEvents,=20health=20=3D=20admin=20dashboard=20IPC?= =?UTF-8?q?=20query).=20Galaxy.Host=20csproj=20gains=20aahClientManaged=20?= =?UTF-8?q?+=20aahClientCommon=20references=20with=20Private=3Dfalse=20(ma?= =?UTF-8?q?naged=20wrappers)=20+=20None=20items=20for=20aahClient.dll=20+?= =?UTF-8?q?=20Historian.CBE.dll=20+=20Historian.DPAPI.dll=20+=20ArchestrA.?= =?UTF-8?q?CloudHistorian.Contract.dll=20native=20satellites=20staged=20al?= =?UTF-8?q?ongside=20the=20host=20exe=20via=20CopyToOutputDirectory=3DPres?= =?UTF-8?q?erveNewest=20so=20aahClientManaged=20can=20P/Invoke=20into=20th?= =?UTF-8?q?em=20at=20runtime=20without=20an=20AssemblyResolve=20hook=20(cl?= =?UTF-8?q?eaner=20than=20the=20v1=20HistorianPluginLoader.cs=20180-LOC=20?= =?UTF-8?q?AssemblyResolve=20+=20Assembly.LoadFrom=20dance=20that=20existe?= =?UTF-8?q?d=20solely=20because=20the=20plugin=20was=20loaded=20late=20fro?= =?UTF-8?q?m=20Host/bin/Debug/net48/Historian/).=20Program.cs=20adds=20Bui?= =?UTF-8?q?ldHistorianIfEnabled()=20that=20reads=20OTOPCUA=5FHISTORIAN=5FE?= =?UTF-8?q?NABLED=20(true=20or=201)=20+=20OTOPCUA=5FHISTORIAN=5FSERVER=20+?= =?UTF-8?q?=20OTOPCUA=5FHISTORIAN=5FSERVERS=20(comma-separated=20cluster?= =?UTF-8?q?=20list=20overrides=20single-server)=20+=20OTOPCUA=5FHISTORIAN?= =?UTF-8?q?=5FPORT=20(default=2032568)=20+=20OTOPCUA=5FHISTORIAN=5FINTEGRA?= =?UTF-8?q?TED=20(default=20true)=20+=20OTOPCUA=5FHISTORIAN=5FUSER/OTOPCUA?= =?UTF-8?q?=5FHISTORIAN=5FPASS=20+=20OTOPCUA=5FHISTORIAN=5FTIMEOUT=5FSEC?= =?UTF-8?q?=20(30)=20+=20OTOPCUA=5FHISTORIAN=5FMAX=5FVALUES=20(10000)=20+?= =?UTF-8?q?=20OTOPCUA=5FHISTORIAN=5FCOOLDOWN=5FSEC=20(60),=20returns=20nul?= =?UTF-8?q?l=20when=20disabled=20so=20MxAccessGalaxyBackend.HistoryReadAsy?= =?UTF-8?q?nc=20surfaces=20a=20clean=20"Historian=20disabled"=20Success=3D?= =?UTF-8?q?false=20instead=20of=20a=20localhost-SDK=20hang;=20server.RunAs?= =?UTF-8?q?ync=20finally=20block=20now=20also=20casts=20backend=20to=20IDi?= =?UTF-8?q?sposable.Dispose()=20so=20the=20historian=20SDK=20connections?= =?UTF-8?q?=20get=20cleanly=20closed=20on=20Ctrl+C.=20MxAccessGalaxyBacken?= =?UTF-8?q?d=20gains=20an=20IHistorianDataSource=3F=20historian=20construc?= =?UTF-8?q?tor=20parameter=20(defaults=20null=20to=20preserve=20existing?= =?UTF-8?q?=20Host.Tests=20call=20sites=20that=20don't=20exercise=20Histor?= =?UTF-8?q?yRead),=20implements=20IDisposable=20that=20forwards=20to=20=5F?= =?UTF-8?q?historian.Dispose(),=20and=20the=20pragma=20warning=20disable?= =?UTF-8?q?=20CS0618=20is=20locally=20scoped=20to=20the=20ToDto(HistorianE?= =?UTF-8?q?vent)=20mapper=20since=20the=20SDK=20marks=20Id/Source/DisplayT?= =?UTF-8?q?ext/Severity=20obsolete=20but=20the=20replacement=20surface=20i?= =?UTF-8?q?sn't=20available=20in=20the=20aahClientManaged=20version=20we?= =?UTF-8?q?=20bind=20against=20=E2=80=94=20every=20other=20deprecated-SDK?= =?UTF-8?q?=20use=20still=20surfaces=20as=20an=20error=20under=20TreatWarn?= =?UTF-8?q?ingsAsErrors.=20Ported=20from=20v1=20Historian.Aveva=20unchange?= =?UTF-8?q?d:=20the=20CloneConfigWithServerName=20helper=20that=20preserve?= =?UTF-8?q?s=20every=20config=20field=20except=20ServerName=20per=20attemp?= =?UTF-8?q?t;=20the=20double-checked=20locking=20in=20EnsureConnected/Ensu?= =?UTF-8?q?reEventConnected=20(fast=20path=20=3D=20Volatile.Read=20outside?= =?UTF-8?q?=20lock,=20slow=20path=20acquires=20lock=20+=20re-checks=20+=20?= =?UTF-8?q?disposes=20any=20raced-in-parallel=20connection);=20HandleConne?= =?UTF-8?q?ctionError/HandleEventConnectionError=20that=20close=20the=20de?= =?UTF-8?q?ad=20connection,=20clear=20the=20active-node=20tracker,=20MarkF?= =?UTF-8?q?ailed=20the=20picker=20entry=20with=20the=20exception=20message?= =?UTF-8?q?=20so=20the=20node=20enters=20cooldown,=20and=20log=20the=20res?= =?UTF-8?q?et=20with=20node=3D=20for=20operator=20correlation;=20RecordSuc?= =?UTF-8?q?cess/RecordFailure=20that=20bump=20counters=20under=20=5Fhealth?= =?UTF-8?q?Lock.=20Tests:=20HistorianClusterEndpointPickerTests=20(7=20cas?= =?UTF-8?q?es)=20=E2=80=94=20single-node=20ServerName=20fallback=20when=20?= =?UTF-8?q?ServerNames=20empty,=20MarkFailed=20enters=20cooldown=20and=20s?= =?UTF-8?q?kips,=20cooldown=20expires=20after=20window,=20MarkHealthy=20im?= =?UTF-8?q?mediately=20clears,=20all-in-cooldown=20returns=20empty=20healt?= =?UTF-8?q?hy=20list,=20Snapshot=20reports=20failure=20count=20+=20last=20?= =?UTF-8?q?error=20+=20IsHealthy,=20case-insensitive=20de-dup=20on=20dupli?= =?UTF-8?q?cate=20hostnames.=20HistorianWiringTests=20(2=20cases)=20?= =?UTF-8?q?=E2=80=94=20HistoryReadAsync=20returns=20"Historian=20disabled"?= =?UTF-8?q?=20Success=3Dfalse=20when=20historian:null=20passed;=20HistoryR?= =?UTF-8?q?eadAsync=20with=20a=20fake=20IHistorianDataSource=20maps=20the?= =?UTF-8?q?=20returned=20HistorianSample=20(Value=3D42.5,=20Quality=3D192?= =?UTF-8?q?=20Good,=20Timestamp)=20to=20a=20GalaxyDataValue=20with=20Statu?= =?UTF-8?q?sCode=3D0u=20+=20SourceTimestampUtcUnixMs=20matching=20the=20sa?= =?UTF-8?q?mple=20+=20MessagePack-encoded=20value=20bytes.=20InternalsVisi?= =?UTF-8?q?bleTo("...Host.Tests")=20added=20to=20Galaxy.Host.csproj=20so?= =?UTF-8?q?=20tests=20can=20reach=20the=20internal=20HistorianClusterEndpo?= =?UTF-8?q?intPicker.=20Full=20Galaxy.Host.Tests=20suite:=2024=20pass=20/?= =?UTF-8?q?=200=20fail=20(9=20new=20historian=20+=2015=20pre-existing=20Me?= =?UTF-8?q?moryWatchdog/PostMortemMmf/RecyclePolicy/StaPump/EndToEndIpc/Ha?= =?UTF-8?q?ndshake).=20Full=20solution=20build:=200=20errors=20(202=20pre-?= =?UTF-8?q?existing=20warnings).=20The=20v1=20Historian.Aveva=20project=20?= =?UTF-8?q?+=20Historian.Aveva.Tests=20still=20build=20intact=20because=20?= =?UTF-8?q?the=20archive=20PR=20(Stream=20D.1=20destructive=20delete)=20is?= =?UTF-8?q?=20still=20ahead=20of=20us=20=E2=80=94=20PR=205=20intentionally?= =?UTF-8?q?=20does=20not=20delete=20either;=20once=20PR=202+3=20merge=20an?= =?UTF-8?q?d=20the=20archive-delete=20PR=20lands,=20a=20follow-up=20cleanu?= =?UTF-8?q?p=20can=20remove=20Historian.Aveva=20+=20its=204=20source=20fil?= =?UTF-8?q?es=20+=2018=20test=20cases.=20Alarm=20subsystem=20wire-up=20(On?= =?UTF-8?q?AlarmEvent=20raising=20from=20MxAccessGalaxyBackend=20via=20Ala?= =?UTF-8?q?rmExtension=20primitives)=20+=20host-status=20push=20(OnHostSta?= =?UTF-8?q?tusChanged=20via=20a=20ported=20GalaxyRuntimeProbeManager)=20re?= =?UTF-8?q?main=20PR=206=20candidates;=20they=20were=20on=20the=20same=20"?= =?UTF-8?q?Task=20B.1.h=20follow-up"=20list=20and=20share=20the=20IPC=20co?= =?UTF-8?q?nnection-sink=20wiring=20with=20the=20historian=20events=20path?= =?UTF-8?q?=20=E2=80=94=20it=20made=20PR=205=20scope-manageable=20to=20do?= =?UTF-8?q?=20Historian=20first=20since=20that's=20what=20has=20the=20bigg?= =?UTF-8?q?est=20surface=20area=20(981=20LOC=20v1=20plus=20SDK=20binding)?= =?UTF-8?q?=20and=20alarms/host-status=20have=20more=20bespoke=20integrati?= =?UTF-8?q?on=20with=20the=20existing=20MxAccess=20subscription=20fan-out.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../HistorianClusterEndpointPicker.cs | 129 ++++ .../Historian/HistorianClusterNodeState.cs | 18 + .../Historian/HistorianConfiguration.cs | 38 ++ .../Backend/Historian/HistorianDataSource.cs | 621 ++++++++++++++++++ .../Backend/Historian/HistorianEventDto.cs | 18 + .../Historian/HistorianHealthSnapshot.cs | 27 + .../Backend/Historian/HistorianSample.cs | 30 + .../Historian/IHistorianConnectionFactory.cs | 73 ++ .../Backend/Historian/IHistorianDataSource.cs | 34 + .../Backend/MxAccessGalaxyBackend.cs | 78 ++- .../Program.cs | 47 +- ...B.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj | 32 + .../HistorianClusterEndpointPickerTests.cs | 94 +++ .../HistorianWiringTests.cs | 109 +++ 14 files changed, 1339 insertions(+), 9 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs new file mode 100644 index 0000000..312207b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which + /// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands + /// out an ordered list of eligible candidates for the data source to try in sequence. + /// + internal sealed class HistorianClusterEndpointPicker + { + private readonly Func _clock; + private readonly TimeSpan _cooldown; + private readonly object _lock = new object(); + private readonly List _nodes; + + public HistorianClusterEndpointPicker(HistorianConfiguration config) + : this(config, () => DateTime.UtcNow) { } + + internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func clock) + { + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + _cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds)); + + var names = (config.ServerNames != null && config.ServerNames.Count > 0) + ? config.ServerNames + : new List { config.ServerName }; + + _nodes = names + .Where(n => !string.IsNullOrWhiteSpace(n)) + .Select(n => n.Trim()) + .Distinct(StringComparer.OrdinalIgnoreCase) + .Select(n => new NodeEntry { Name = n }) + .ToList(); + } + + public int NodeCount + { + get { lock (_lock) return _nodes.Count; } + } + + public IReadOnlyList GetHealthyNodes() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList(); + } + } + + public int HealthyNodeCount + { + get + { + lock (_lock) + { + var now = _clock(); + return _nodes.Count(n => IsHealthyAt(n, now)); + } + } + } + + public void MarkFailed(string node, string? error) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + + var now = _clock(); + entry.FailureCount++; + entry.LastError = error; + entry.LastFailureTime = now; + entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null; + } + } + + public void MarkHealthy(string node) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + entry.CooldownUntil = null; + } + } + + public List SnapshotNodeStates() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Select(n => new HistorianClusterNodeState + { + Name = n.Name, + IsHealthy = IsHealthyAt(n, now), + CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil, + FailureCount = n.FailureCount, + LastError = n.LastError, + LastFailureTime = n.LastFailureTime + }).ToList(); + } + } + + private static bool IsHealthyAt(NodeEntry entry, DateTime now) + { + return entry.CooldownUntil == null || entry.CooldownUntil <= now; + } + + private NodeEntry? FindEntry(string node) + { + for (var i = 0; i < _nodes.Count; i++) + if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase)) + return _nodes[i]; + return null; + } + + private sealed class NodeEntry + { + public string Name { get; set; } = ""; + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs new file mode 100644 index 0000000..5b78243 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time state of a single historian cluster node. One entry per configured node + /// appears inside . + /// + public sealed class HistorianClusterNodeState + { + public string Name { get; set; } = ""; + public bool IsHealthy { get; set; } + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs new file mode 100644 index 0000000..8d3ac17 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Wonderware Historian SDK configuration. Populated from environment variables at Host + /// startup (see Program.cs) or from the Proxy's DriverInstance.DriverConfig + /// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation. + /// + public sealed class HistorianConfiguration + { + public bool Enabled { get; set; } = false; + + /// Single-node fallback when is empty. + public string ServerName { get; set; } = "localhost"; + + /// + /// Ordered cluster nodes. When non-empty, the data source tries each in order on connect, + /// falling through to the next on failure. A failed node is placed in cooldown for + /// before being re-eligible. + /// + public List ServerNames { get; set; } = new(); + + public int FailureCooldownSeconds { get; set; } = 60; + public bool IntegratedSecurity { get; set; } = true; + public string? UserName { get; set; } + public string? Password { get; set; } + public int Port { get; set; } = 32568; + public int CommandTimeoutSeconds { get; set; } = 30; + public int MaxValuesPerRead { get; set; } = 10000; + + /// + /// Outer safety timeout applied to sync-over-async Historian operations. Must be + /// comfortably larger than . + /// + public int RequestTimeoutSeconds { get; set; } = 60; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs new file mode 100644 index 0000000..d132aed --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs @@ -0,0 +1,621 @@ +using System; +using System.Collections.Generic; +using StringCollection = System.Collections.Specialized.StringCollection; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Reads historical data from the Wonderware Historian via the aahClientManaged SDK. + /// OPC-UA-free — emits / + /// which the Proxy maps to OPC UA DataValue on its side of the IPC. + /// + public sealed class HistorianDataSource : IHistorianDataSource + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + private readonly object _connectionLock = new object(); + private readonly object _eventConnectionLock = new object(); + private readonly IHistorianConnectionFactory _factory; + private HistorianAccess? _connection; + private HistorianAccess? _eventConnection; + private bool _disposed; + + private readonly object _healthLock = new object(); + private long _totalSuccesses; + private long _totalFailures; + private int _consecutiveFailures; + private DateTime? _lastSuccessTime; + private DateTime? _lastFailureTime; + private string? _lastError; + private string? _activeProcessNode; + private string? _activeEventNode; + + private readonly HistorianClusterEndpointPicker _picker; + + public HistorianDataSource(HistorianConfiguration config) + : this(config, new SdkHistorianConnectionFactory(), null) { } + + internal HistorianDataSource( + HistorianConfiguration config, + IHistorianConnectionFactory factory, + HistorianClusterEndpointPicker? picker = null) + { + _config = config; + _factory = factory; + _picker = picker ?? new HistorianClusterEndpointPicker(config); + } + + private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type) + { + var candidates = _picker.GetHealthyNodes(); + if (candidates.Count == 0) + { + var total = _picker.NodeCount; + throw new InvalidOperationException( + total == 0 + ? "No historian nodes configured" + : $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to"); + } + + Exception? lastException = null; + foreach (var node in candidates) + { + var attemptConfig = CloneConfigWithServerName(node); + try + { + var conn = _factory.CreateAndConnect(attemptConfig, type); + _picker.MarkHealthy(node); + return (conn, node); + } + catch (Exception ex) + { + _picker.MarkFailed(node, ex.Message); + lastException = ex; + Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node); + } + } + + var inner = lastException?.Message ?? "(no detail)"; + throw new InvalidOperationException( + $"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}", + lastException); + } + + private HistorianConfiguration CloneConfigWithServerName(string serverName) + { + return new HistorianConfiguration + { + Enabled = _config.Enabled, + ServerName = serverName, + ServerNames = _config.ServerNames, + FailureCooldownSeconds = _config.FailureCooldownSeconds, + IntegratedSecurity = _config.IntegratedSecurity, + UserName = _config.UserName, + Password = _config.Password, + Port = _config.Port, + CommandTimeoutSeconds = _config.CommandTimeoutSeconds, + MaxValuesPerRead = _config.MaxValuesPerRead + }; + } + + public HistorianHealthSnapshot GetHealthSnapshot() + { + var nodeStates = _picker.SnapshotNodeStates(); + var healthyCount = 0; + foreach (var n in nodeStates) + if (n.IsHealthy) healthyCount++; + + lock (_healthLock) + { + return new HistorianHealthSnapshot + { + TotalQueries = _totalSuccesses + _totalFailures, + TotalSuccesses = _totalSuccesses, + TotalFailures = _totalFailures, + ConsecutiveFailures = _consecutiveFailures, + LastSuccessTime = _lastSuccessTime, + LastFailureTime = _lastFailureTime, + LastError = _lastError, + ProcessConnectionOpen = Volatile.Read(ref _connection) != null, + EventConnectionOpen = Volatile.Read(ref _eventConnection) != null, + ActiveProcessNode = _activeProcessNode, + ActiveEventNode = _activeEventNode, + NodeCount = nodeStates.Count, + HealthyNodeCount = healthyCount, + Nodes = nodeStates + }; + } + } + + private void RecordSuccess() + { + lock (_healthLock) + { + _totalSuccesses++; + _lastSuccessTime = DateTime.UtcNow; + _consecutiveFailures = 0; + _lastError = null; + } + } + + private void RecordFailure(string error) + { + lock (_healthLock) + { + _totalFailures++; + _lastFailureTime = DateTime.UtcNow; + _consecutiveFailures++; + _lastError = error; + } + } + + private void EnsureConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _connection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process); + + lock (_connectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_connection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _connection = conn; + lock (_healthLock) _activeProcessNode = winningNode; + Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleConnectionError(Exception? ex = null) + { + lock (_connectionLock) + { + if (_connection == null) return; + + try + { + _connection.CloseConnection(out _); + _connection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery"); + } + + _connection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeProcessNode; + _activeProcessNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + private void EnsureEventConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _eventConnection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event); + + lock (_eventConnectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_eventConnection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _eventConnection = conn; + lock (_healthLock) _activeEventNode = winningNode; + Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleEventConnectionError(Exception? ex = null) + { + lock (_eventConnectionLock) + { + if (_eventConnection == null) return; + + try + { + _eventConnection.CloseConnection(out _); + _eventConnection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery"); + } + + _eventConnection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeEventNode; + _activeEventNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + public Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + RetrievalMode = HistorianRetrievalMode.Full + }; + + if (maxValues > 0) + args.BatchSize = (uint)maxValues; + else if (_config.MaxValuesPerRead > 0) + args.BatchSize = (uint)_config.MaxValuesPerRead; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"raw StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead; + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = timestamp, + Quality = (byte)(result.OpcQuality & 0xFF), + }); + + count++; + if (limit > 0 && count >= limit) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName); + RecordFailure($"raw: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})", + tagName, results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + public Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateAnalogSummaryQuery(); + var args = new AnalogSummaryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + Resolution = (ulong)intervalMs + }; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"aggregate StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + var value = ExtractAggregateValue(result, aggregateColumn); + + results.Add(new HistorianAggregateSample + { + Value = value, + TimestampUtc = timestamp, + }); + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName); + RecordFailure($"aggregate: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values", + aggregateColumn, tagName, results.Count); + + return Task.FromResult(results); + } + + public Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default) + { + var results = new List(); + + if (timestamps == null || timestamps.Length == 0) + return Task.FromResult(results); + + try + { + EnsureConnected(); + + foreach (var timestamp in timestamps) + { + ct.ThrowIfCancellationRequested(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = timestamp, + EndDateTime = timestamp, + RetrievalMode = HistorianRetrievalMode.Interpolated, + BatchSize = 1 + }; + + if (!query.StartQuery(args, out var error)) + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, // Bad + }); + continue; + } + + if (query.MoveNext(out error)) + { + var result = query.QueryResult; + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = (byte)(result.OpcQuality & 0xFF), + }); + } + else + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, + }); + } + + query.EndQuery(out _); + } + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName); + RecordFailure($"at-time: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps", + tagName, results.Count, timestamps.Length); + + return Task.FromResult(results); + } + + public Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureEventConnected(); + + using var query = _eventConnection!.CreateEventQuery(); + var args = new EventQueryArgs + { + StartDateTime = startTime, + EndDateTime = endTime, + EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead, + QueryType = HistorianEventQueryType.Events, + EventOrder = HistorianEventOrder.Ascending + }; + + if (!string.IsNullOrEmpty(sourceName)) + { + query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _); + } + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode); + RecordFailure($"events StartQuery: {error.ErrorCode}"); + HandleEventConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + results.Add(ToDto(query.QueryResult)); + count++; + if (maxEvents > 0 && count >= maxEvents) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)"); + RecordFailure($"events: {ex.Message}"); + HandleEventConnectionError(ex); + } + + Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})", + sourceName ?? "(all)", results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + private static HistorianEventDto ToDto(HistorianEvent evt) + { + // The ArchestrA SDK marks these properties obsolete but still returns them; their + // successors aren't wired in the version we bind against. Using them is the documented + // v1 behavior — suppressed locally instead of project-wide so any non-event use of + // deprecated SDK surface still surfaces as an error. +#pragma warning disable CS0618 + return new HistorianEventDto + { + Id = evt.Id, + Source = evt.Source, + EventTime = evt.EventTime, + ReceivedTime = evt.ReceivedTime, + DisplayText = evt.DisplayText, + Severity = (ushort)evt.Severity + }; +#pragma warning restore CS0618 + } + + internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column) + { + switch (column) + { + case "Average": return result.Average; + case "Minimum": return result.Minimum; + case "Maximum": return result.Maximum; + case "ValueCount": return result.ValueCount; + case "First": return result.First; + case "Last": return result.Last; + case "StdDev": return result.StdDev; + default: return null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try + { + _connection?.CloseConnection(out _); + _connection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK connection"); + } + + try + { + _eventConnection?.CloseConnection(out _); + _eventConnection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK event connection"); + } + + _connection = null; + _eventConnection = null; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs new file mode 100644 index 0000000..d01e164 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// SDK-free representation of a Historian event record. Prevents ArchestrA types from + /// leaking beyond HistorianDataSource. + /// + public sealed class HistorianEventDto + { + public Guid Id { get; set; } + public string? Source { get; set; } + public DateTime EventTime { get; set; } + public DateTime ReceivedTime { get; set; } + public string? DisplayText { get; set; } + public ushort Severity { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs new file mode 100644 index 0000000..d056435 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard + /// via an IPC health query (not wired in PR #5; deferred). + /// + public sealed class HistorianHealthSnapshot + { + public long TotalQueries { get; set; } + public long TotalSuccesses { get; set; } + public long TotalFailures { get; set; } + public int ConsecutiveFailures { get; set; } + public DateTime? LastSuccessTime { get; set; } + public DateTime? LastFailureTime { get; set; } + public string? LastError { get; set; } + public bool ProcessConnectionOpen { get; set; } + public bool EventConnectionOpen { get; set; } + public string? ActiveProcessNode { get; set; } + public string? ActiveEventNode { get; set; } + public int NodeCount { get; set; } + public int HealthyNodeCount { get; set; } + public List Nodes { get; set; } = new(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs new file mode 100644 index 0000000..3f78ffd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs @@ -0,0 +1,30 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free representation of a single historical data point. The Host returns these + /// across the IPC boundary as GalaxyDataValue; the Proxy maps quality and value to + /// OPC UA DataValue. Raw MX quality byte is preserved so the Proxy can use the same + /// quality mapper it already uses for live reads. + /// + public sealed class HistorianSample + { + public object? Value { get; set; } + + /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality). + public byte Quality { get; set; } + + public DateTime TimestampUtc { get; set; } + } + + /// + /// Result of . When is + /// null the aggregate is unavailable for that bucket (Proxy maps to BadNoData). + /// + public sealed class HistorianAggregateSample + { + public double? Value { get; set; } + public DateTime TimestampUtc { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs new file mode 100644 index 0000000..51001e9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using ArchestrA; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that + /// control connection success, failure, and timeout behavior. + /// + internal interface IHistorianConnectionFactory + { + HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type); + } + + /// Production implementation — opens real Historian SDK connections. + internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory + { + public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type) + { + var conn = new HistorianAccess(); + + var args = new HistorianConnectionArgs + { + ServerName = config.ServerName, + TcpPort = (ushort)config.Port, + IntegratedSecurity = config.IntegratedSecurity, + UseArchestrAUser = config.IntegratedSecurity, + ConnectionType = type, + ReadOnly = true, + PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) + }; + + if (!config.IntegratedSecurity) + { + args.UserName = config.UserName ?? string.Empty; + args.Password = config.Password ?? string.Empty; + } + + if (!conn.OpenConnection(args, out var error)) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}"); + } + + var timeoutMs = config.CommandTimeoutSeconds * 1000; + var elapsed = 0; + while (elapsed < timeoutMs) + { + var status = new HistorianConnectionStatus(); + conn.GetConnectionStatus(ref status); + + if (status.ConnectedToServer) + return conn; + + if (status.ErrorOccurred) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Historian SDK connection failed: {status.Error}"); + } + + Thread.Sleep(250); + elapsed += 250; + } + + conn.Dispose(); + throw new TimeoutException( + $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s"); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs new file mode 100644 index 0000000..146ae1b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host. + /// Implementations read via the aahClient* SDK; the Proxy side maps returned samples + /// to OPC UA DataValue. + /// + public interface IHistorianDataSource : IDisposable + { + Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default); + + Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default); + + Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default); + + Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default); + + HistorianHealthSnapshot GetHealthSnapshot(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index 0134451..7cd543a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; @@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// -public sealed class MxAccessGalaxyBackend : IGalaxyBackend +public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; + private readonly IHistorianDataSource? _historian; private long _nextSessionId; private long _nextSubscriptionId; @@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public event System.EventHandler? OnHostStatusChanged; #pragma warning restore CS0067 - public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) + public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { _repository = repository; _mx = mx; + _historian = historian; } public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) @@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; - public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) - => Task.FromResult(new HistoryReadResponse + public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) + { + if (_historian is null) + return new HistoryReadResponse + { + Success = false, + Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", + Tags = Array.Empty(), + }; + + var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; + var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; + var tags = new List(req.TagReferences.Length); + + try { - Success = false, - Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)", - Tags = Array.Empty(), - }); + foreach (var reference in req.TagReferences) + { + var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false); + tags.Add(new HistoryTagValues + { + TagReference = reference, + Values = samples.Select(s => ToWire(reference, s)).ToArray(), + }); + } + return new HistoryReadResponse { Success = true, Tags = tags.ToArray() }; + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + return new HistoryReadResponse + { + Success = false, + Error = $"Historian read failed: {ex.Message}", + Tags = tags.ToArray(), + }; + } + } public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); + public void Dispose() => _historian?.Dispose(); + private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, @@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; + /// + /// Maps a (raw historian row, OPC-UA-free) to the IPC wire + /// shape. The Proxy decodes the MessagePack value and maps + /// through QualityMapper on its side of the pipe — we keep the raw byte here so + /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive + /// the hop intact. + /// + private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new() + { + TagReference = reference, + ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value), + ValueMessagePackType = 0, + StatusCode = MapHistorianQualityToOpcUa(sample.Quality), + SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + private static uint MapHistorianQualityToOpcUa(byte q) + { + // Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges. + // The Proxy may refine this when it decodes the wire frame. + if (q >= 192) return 0x00000000u; // Good + if (q >= 64) return 0x40000000u; // Uncertain + return 0x80000000u; // Bad + } + private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs index efff93f..2a5ceec 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs @@ -4,6 +4,7 @@ using System.Threading; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; @@ -66,9 +67,11 @@ public static class Program pump = new StaPump("Galaxy.Sta"); pump.WaitForStartedAsync().GetAwaiter().GetResult(); mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName); + var historian = BuildHistorianIfEnabled(); backend = new MxAccessGalaxyBackend( new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }), - mx); + mx, + historian); break; } @@ -77,6 +80,7 @@ public static class Program try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); } finally { + (backend as IDisposable)?.Dispose(); mx?.Dispose(); pump?.Dispose(); } @@ -91,4 +95,45 @@ public static class Program } finally { Log.CloseAndFlush(); } } + + /// + /// Builds a from the OTOPCUA_HISTORIAN_* environment + /// variables the supervisor passes at spawn time. Returns null when the historian is + /// disabled (default) so MxAccessGalaxyBackend.HistoryReadAsync returns a clear + /// "not configured" error instead of attempting an SDK connection to localhost. + /// + private static IHistorianDataSource? BuildHistorianIfEnabled() + { + var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"); + if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1") + return null; + + var cfg = new HistorianConfiguration + { + Enabled = true, + ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", + Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568), + IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase), + UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"), + Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"), + CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30), + MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000), + FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60), + }; + + var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS"); + if (!string.IsNullOrWhiteSpace(servers)) + cfg.ServerNames = new System.Collections.Generic.List( + servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries)); + + Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}", + cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port); + return new HistorianDataSource(cfg); + } + + private static int TryParseInt(string envName, int defaultValue) + { + var raw = Environment.GetEnvironmentVariable(envName); + return int.TryParse(raw, out var parsed) ? parsed : defaultValue; + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj index bc8a16a..7498013 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj @@ -30,11 +30,43 @@ + + + + ..\..\lib\ArchestrA.MxAccess.dll true + + + ..\..\lib\aahClientManaged.dll + false + + + ..\..\lib\aahClientCommon.dll + false + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs new file mode 100644 index 0000000..4078080 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs @@ -0,0 +1,94 @@ +using System; +using System.Linq; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianClusterEndpointPickerTests + { + private static HistorianConfiguration Config(params string[] nodes) => new() + { + ServerName = "ignored", + ServerNames = nodes.ToList(), + FailureCooldownSeconds = 60, + }; + + [Fact] + public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty() + { + var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() }; + var p = new HistorianClusterEndpointPicker(cfg); + p.NodeCount.ShouldBe(1); + p.GetHealthyNodes().ShouldBe(new[] { "only-node" }); + } + + [Fact] + public void Failed_node_enters_cooldown_and_is_skipped() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + } + + [Fact] + public void Cooldown_expires_after_configured_window() + { + var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + + clock = clock.AddSeconds(61); + p.GetHealthyNodes().ShouldBe(new[] { "a", "b" }); + } + + [Fact] + public void MarkHealthy_immediately_clears_cooldown() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.MarkHealthy("a"); + p.GetHealthyNodes().ShouldBe(new[] { "a" }); + } + + [Fact] + public void All_nodes_in_cooldown_returns_empty_healthy_list() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + p.MarkFailed("a", "x"); + p.MarkFailed("b", "y"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.NodeCount.ShouldBe(2); + } + + [Fact] + public void Snapshot_reports_failure_count_and_last_error() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "first"); + p.MarkFailed("a", "second"); + + var snap = p.SnapshotNodeStates().Single(); + snap.FailureCount.ShouldBe(2); + snap.LastError.ShouldBe("second"); + snap.IsHealthy.ShouldBeFalse(); + snap.CooldownUntil.ShouldNotBeNull(); + } + + [Fact] + public void Duplicate_hostnames_are_deduplicated_case_insensitively() + { + var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB")); + p.NodeCount.ShouldBe(2); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs new file mode 100644 index 0000000..4c7800c --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianWiringTests + { + /// + /// When the Proxy sends a HistoryRead but the supervisor never enabled the historian + /// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a + /// self-explanatory error — not an exception or a hang against localhost. + /// + [Fact] + public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + historian: null); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TestTag" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("Historian disabled"); + resp.Tags.ShouldBeEmpty(); + } + + /// + /// When the historian is wired up, we expect the backend to call through and map + /// samples onto the IPC wire shape. Uses a fake + /// that returns a single known-good sample so we can assert the mapping stays sane. + /// + [Fact] + public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + var fake = new FakeHistorianDataSource(new HistorianSample + { + Value = 42.5, + Quality = 192, // Good + TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc), + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TankLevel" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Tags.Length.ShouldBe(1); + resp.Tags[0].TagReference.ShouldBe("TankLevel"); + resp.Tags[0].Values.Length.ShouldBe(1); + resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good + resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull(); + resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe( + new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + private sealed class FakeHistorianDataSource : IHistorianDataSource + { + private readonly HistorianSample _sample; + public FakeHistorianDataSource(HistorianSample sample) => _sample = sample; + + public Task> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List { _sample }); + + public Task> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + public void Dispose() { } + } + } +} -- 2.49.1 From 1c2bf74d38a69e20266d88fc07b01e2658ce0334 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 02:06:15 -0400 Subject: [PATCH 2/2] =?UTF-8?q?Phase=202=20PR=206=20=E2=80=94=20close=20th?= =?UTF-8?q?e=202=20low=20findings=20carried=20forward=20from=20PR=204.=20L?= =?UTF-8?q?ow=20finding=20#1=20($Heartbeat=20probe=20handle=20leak=20in=20?= =?UTF-8?q?MonitorLoopAsync):=20the=20probe=20calls=20=5Fproxy.AddItem(con?= =?UTF-8?q?nectionHandle,=20"$Heartbeat")=20on=20every=20monitor=20tick=20?= =?UTF-8?q?that=20observes=20the=20connection=20is=20past=20StaleThreshold?= =?UTF-8?q?,=20but=20previously=20discarded=20the=20returned=20item=20hand?= =?UTF-8?q?le=20=E2=80=94=20so=20every=20probe=20(one=20per=20MonitorInter?= =?UTF-8?q?val,=20default=205s)=20leaked=20one=20item=20handle=20into=20th?= =?UTF-8?q?e=20MXAccess=20proxy's=20internal=20handle=20table.=20Fix:=20ca?= =?UTF-8?q?pture=20the=20item=20handle,=20call=20RemoveItem(connectionHand?= =?UTF-8?q?le,=20probeHandle)=20in=20the=20InvokeAsync's=20finally=20block?= =?UTF-8?q?=20so=20it=20runs=20on=20the=20same=20pump=20turn=20as=20the=20?= =?UTF-8?q?AddItem,=20best-effort=20RemoveItem=20swallow=20so=20a=20dying?= =?UTF-8?q?=20proxy=20doesn't=20throw=20secondary=20exceptions=20out=20of?= =?UTF-8?q?=20the=20probe=20path.=20Probe=20ok=20becomes=20`probeHandle=20?= =?UTF-8?q?>=200`=20so=20any=20AddItem=20that=20returns=200=20(MXAccess's?= =?UTF-8?q?=20"could=20not=20create")=20counts=20as=20a=20failed=20probe,?= =?UTF-8?q?=20matching=20v1=20behavior.=20Low=20finding=20#2=20(subscripti?= =?UTF-8?q?on=20replay=20silently=20swallowed=20per-tag=20failures):=20aft?= =?UTF-8?q?er=20a=20reconnect,=20the=20replay=20loop=20iterates=20the=20pr?= =?UTF-8?q?e-reconnect=20subscription=20snapshot=20and=20calls=20Subscribe?= =?UTF-8?q?OnPumpAsync=20for=20each;=20previously=20those=20failures=20wen?= =?UTF-8?q?t=20into=20a=20bare=20catch=20{=20/*=20skip=20*/=20}=20so=20an?= =?UTF-8?q?=20operator=20had=20no=20signal=20when=20specific=20tags=20fail?= =?UTF-8?q?ed=20to=20re-subscribe=20=E2=80=94=20the=20first=20indication?= =?UTF-8?q?=20downstream=20was=20a=20quality=20drop=20on=20OPC=20UA=20clie?= =?UTF-8?q?nts.=20Fix:=20new=20SubscriptionReplayFailedEventArgs=20(TagRef?= =?UTF-8?q?erence=20+=20Exception)=20+=20SubscriptionReplayFailed=20event?= =?UTF-8?q?=20on=20MxAccessClient=20that=20fires=20once=20per=20tag=20that?= =?UTF-8?q?=20fails=20to=20re-subscribe,=20Log.Warning=20per=20failure=20w?= =?UTF-8?q?ith=20the=20reconnect=20counter=20+=20tag=20reference,=20and=20?= =?UTF-8?q?a=20summary=20log=20line=20at=20the=20end=20of=20the=20replay?= =?UTF-8?q?=20loop=20("{failed}=20of=20{total}=20failed"=20or=20"{total}?= =?UTF-8?q?=20re-subscribed=20cleanly").=20Serilog=20`using`=20+=20`ILogge?= =?UTF-8?q?r=20Log=20=3D=20Serilog.Log.ForContext()`=20add?= =?UTF-8?q?ed.=20Tests=20=E2=80=94=20MxAccessClientMonitorLoopTests=20(new?= =?UTF-8?q?=20file,=202=20cases):=20Heartbeat=5Fprobe=5Fcalls=5FRemoveItem?= =?UTF-8?q?=5Ffor=5Fevery=5FAddItem=20constructs=20a=20CountingProxy=20IMx?= =?UTF-8?q?Proxy=20that=20tracks=20AddItem/RemoveItem=20pair=20counts=20sc?= =?UTF-8?q?oped=20to=20the=20"$Heartbeat"=20address,=20runs=20the=20client?= =?UTF-8?q?=20with=20MonitorInterval=3D150ms=20+=20StaleThreshold=3D50ms?= =?UTF-8?q?=20for=20700ms,=20asserts=20HeartbeatAddCount=20>=201,=20Heartb?= =?UTF-8?q?eatAddCount=20=3D=3D=20HeartbeatRemoveCount,=20OutstandingHeart?= =?UTF-8?q?beatHandles=20=3D=3D=200=20after=20dispose;=20SubscriptionRepla?= =?UTF-8?q?yFailed=5Ffires=5Ffor=5Feach=5Ftag=5Fthat=5Ffails=5Fto=5Freplay?= =?UTF-8?q?=20uses=20a=20ReplayFailingProxy=20that=20throws=20on=20the=20n?= =?UTF-8?q?ext=20$Heartbeat=20probe=20(to=20trigger=20the=20reconnect=20pa?= =?UTF-8?q?th)=20and=20throws=20on=20the=20replay-time=20AddItem=20for=20s?= =?UTF-8?q?pecified=20tag=20names=20("BadTag.A",=20"BadTag.B"),=20subscrib?= =?UTF-8?q?es=20GoodTag.X=20+=20BadTag.A=20+=20BadTag.B=20before=20trigger?= =?UTF-8?q?ing=20probe=20failure,=20collects=20SubscriptionReplayFailed=20?= =?UTF-8?q?args=20into=20a=20ConcurrentBag,=20asserts=20exactly=202=20even?= =?UTF-8?q?ts=20fired=20and=20both=20bad=20tags=20are=20represented=20?= =?UTF-8?q?=E2=80=94=20GoodTag.X=20replays=20cleanly=20so=20it=20does=20no?= =?UTF-8?q?t=20fire.=20Host.Tests=20csproj=20gains=20a=20Reference=20to=20?= =?UTF-8?q?lib/ArchestrA.MxAccess.dll=20because=20IMxProxy's=20MxDataChang?= =?UTF-8?q?eHandler=20delegate=20signature=20mentions=20MXSTATUS=5FPROXY?= =?UTF-8?q?=20and=20the=20compiler=20resolves=20all=20delegate=20parameter?= =?UTF-8?q?=20types=20when=20a=20test=20class=20implements=20the=20interfa?= =?UTF-8?q?ce,=20even=20if=20the=20test=20code=20never=20names=20the=20typ?= =?UTF-8?q?e.=20No=20regressions:=20full=20Galaxy.Host.Tests=20Unit=20suit?= =?UTF-8?q?e=2026=20pass=20/=200=20fail=20(2=20new=20monitor-loop=20tests?= =?UTF-8?q?=20+=209=20PR5=20historian=20+=2015=20pre-existing=20PostMortem?= =?UTF-8?q?Mmf/RecyclePolicy/StaPump/MemoryWatchdog/EndToEndIpc/Handshake)?= =?UTF-8?q?.=20Galaxy.Host=20builds=20clean=20(0=20errors,=200=20warnings)?= =?UTF-8?q?=20=E2=80=94=20the=20new=20Serilog.Log.ForContext=20usage=20pic?= =?UTF-8?q?ks=20up=20the=20existing=20Serilog=20package=20ref=20that=20PR?= =?UTF-8?q?=204=20pulled=20in=20for=20the=20monitor-loop=20infrastructure.?= =?UTF-8?q?=20Both=20findings=20were=20flagged=20as=20non-blocking=20for?= =?UTF-8?q?=20PR=204=20merge=20and=20are=20now=20resolved=20alongside=20wh?= =?UTF-8?q?ichever=20merge=20order=20the=20reviewer=20picks;=20this=20PR?= =?UTF-8?q?=20branches=20off=20phase-2-pr4-findings=20so=20it=20can=20reba?= =?UTF-8?q?se=20cleanly=20if=20PR=204=20lands=20first=20or=20be=20re-based?= =?UTF-8?q?=20onto=20master=20after=20PR=204=20merges.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Backend/MxAccess/MxAccessClient.cs | 55 +++++- .../SubscriptionReplayFailedEventArgs.cs | 20 ++ .../MxAccessClientMonitorLoopTests.cs | 173 ++++++++++++++++++ ...WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj | 5 + 4 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs index de38f37..222bdef 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; +using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; @@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; /// public sealed class MxAccessClient : IDisposable { + private static readonly ILogger Log = Serilog.Log.ForContext(); + private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; @@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable /// Fires whenever the connection transitions Connected ↔ Disconnected. public event EventHandler? ConnectionStateChanged; + /// + /// Fires once per failed subscription replay after a reconnect. Carries the tag reference + /// and the exception so the backend can propagate the degradation signal (e.g. mark the + /// subscription bad on the Proxy side rather than silently losing its callback). Added for + /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an + /// operator would only find out that a specific subscription stopped updating through a + /// data-quality complaint from downstream. + /// + public event EventHandler? SubscriptionReplayFailed; + public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; @@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable if (idle <= _options.StaleThreshold) continue; // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's - // our reconnect signal. + // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item + // handle; we must RemoveItem it on the same pump turn or the long-running monitor + // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely). bool probeOk; try { probeOk = await _pump.InvokeAsync(() => { - // AddItem on the connection handle is cheap and round-trips through COM. - // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone. - try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; } + int probeHandle = 0; + try + { + probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat"); + return probeHandle > 0; + } catch { return false; } + finally + { + if (probeHandle > 0) + { + try { _proxy.RemoveItem(_connectionHandle, probeHandle); } + catch { /* proxy is dying; best-effort cleanup */ } + } + } }); } catch { probeOk = false; } @@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable _reconnectCount++; ConnectionStateChanged?.Invoke(this, true); - // Replay every subscription that was active before the disconnect. + // Replay every subscription that was active before the disconnect. PR 6 low + // finding #2: surface per-tag failures — log them and raise + // SubscriptionReplayFailed so the backend can propagate the degraded state + // (previously swallowed silently; downstream quality dropped without a signal). var snapshot = _addressToHandle.Keys.ToArray(); _addressToHandle.Clear(); _handleToAddress.Clear(); + var failed = 0; foreach (var fullRef in snapshot) { try { await SubscribeOnPumpAsync(fullRef); } - catch { /* skip — operator can re-subscribe */ } + catch (Exception subEx) + { + failed++; + Log.Warning(subEx, + "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}", + fullRef, _reconnectCount); + SubscriptionReplayFailed?.Invoke(this, + new SubscriptionReplayFailedEventArgs(fullRef, subEx)); + } } + if (failed > 0) + Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length); + else + Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length); + _lastObservedActivityUtc = DateTime.UtcNow; } catch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs new file mode 100644 index 0000000..ee8f03b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs @@ -0,0 +1,20 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +/// +/// Fired by when a previously-active +/// subscription fails to be restored after a reconnect. The backend should treat the tag as +/// unhealthy until the next successful resubscribe. +/// +public sealed class SubscriptionReplayFailedEventArgs : EventArgs +{ + public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception) + { + TagReference = tagReference; + Exception = exception; + } + + public string TagReference { get; } + public Exception Exception { get; } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs new file mode 100644 index 0000000..c071788 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class MxAccessClientMonitorLoopTests +{ + /// + /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it + /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds, + /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can + /// eventually exhaust the runtime proxy's handle table. + /// + [Fact] + public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem() + { + using var pump = new StaPump("Monitor.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new CountingProxy(); + var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(150), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + await client.ConnectAsync(); + + // Wait past StaleThreshold, then let several monitor cycles fire. + await Task.Delay(700); + + client.Dispose(); + + // One Heartbeat probe fires per monitor tick once the connection looks stale. + proxy.HeartbeatAddCount.ShouldBeGreaterThan(1); + // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle. + proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount); + proxy.OutstandingHeartbeatHandles.ShouldBe(0); + } + + /// + /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise + /// SubscriptionReplayFailed so the backend can propagate the degradation, not get + /// silently eaten. + /// + [Fact] + public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay() + { + using var pump = new StaPump("Replay.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" }); + var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(120), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + var failures = new ConcurrentBag(); + client.SubscriptionReplayFailed += (_, e) => failures.Add(e); + + await client.ConnectAsync(); + await client.SubscribeAsync("GoodTag.X", (_, _) => { }); + await client.SubscribeAsync("BadTag.A", (_, _) => { }); + await client.SubscribeAsync("BadTag.B", (_, _) => { }); + + proxy.TriggerProbeFailureOnNextCall(); + + // Wait for the monitor loop to probe → fail → reconnect → replay. + await Task.Delay(800); + + client.Dispose(); + + failures.Count.ShouldBe(2); + var names = new HashSet(); + foreach (var f in failures) names.Add(f.TagReference); + names.ShouldContain("BadTag.A"); + names.ShouldContain("BadTag.B"); + } + + // ----- test doubles ----- + + private sealed class CountingProxy : IMxProxy + { + private int _next = 1; + private readonly ConcurrentDictionary _live = new(); + + public int HeartbeatAddCount; + public int HeartbeatRemoveCount; + public int OutstandingHeartbeatHandles => _live.Count; + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + var h = Interlocked.Increment(ref _next); + _live[h] = address; + if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount); + return h; + } + + public void RemoveItem(int _, int itemHandle) + { + if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat") + Interlocked.Increment(ref HeartbeatRemoveCount); + } + + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } + + /// + /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall + /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the + /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the + /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once + /// per failing tag. + /// + private sealed class ReplayFailingProxy : IMxProxy + { + private int _next = 1; + private readonly HashSet _failOnReplay; + private int _probeFailOnce; + private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase); + + public ReplayFailingProxy(IEnumerable failOnReplayForTags) + { + _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase); + } + + public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1); + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1) + throw new InvalidOperationException("simulated probe failure"); + + // Fail only on the *replay* AddItem for listed tags — not the initial subscribe. + if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address)) + throw new InvalidOperationException($"simulated replay failure for {address}"); + + if (_failOnReplay.Contains(address)) _replayedOnce[address] = true; + return Interlocked.Increment(ref _next); + } + + public void RemoveItem(int _, int __) { } + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj index fd5d722..6f803d5 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj @@ -24,6 +24,11 @@ + + + ..\..\lib\ArchestrA.MxAccess.dll + -- 2.49.1