diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Observability/OtOpcUaTelemetry.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Observability/OtOpcUaTelemetry.cs new file mode 100644 index 0000000..efb9909 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Observability/OtOpcUaTelemetry.cs @@ -0,0 +1,81 @@ +using System.Diagnostics; +using System.Diagnostics.Metrics; + +namespace ZB.MOM.WW.OtOpcUa.Commons.Observability; + +/// +/// Central + definitions for OtOpcUa. +/// All Akka actors, the OPC UA publish path, and the deploy coordinator emit through these +/// pre-created instruments so a single OpenTelemetry / Prometheus binding in Host +/// catches everything. No exporter is required — instruments are no-op until a listener +/// attaches, so tests and dev hosts pay nothing for instrumentation that nobody scrapes. +/// +/// Instrument names follow the OpenTelemetry semantic convention pattern +/// otopcua.<subsystem>.<event>. Subsystem is one of: deploy, driver, +/// virtualtag, scriptedalarm, opcua, redundancy. +/// +public static class OtOpcUaTelemetry +{ + public const string MeterName = "ZB.MOM.WW.OtOpcUa"; + public const string ActivitySourceName = "ZB.MOM.WW.OtOpcUa"; + + /// Singleton all counters/histograms hang off. + public static readonly Meter Meter = new(MeterName); + + /// Singleton used to start spans wrapping deploy/apply/rebuild. + public static readonly ActivitySource ActivitySource = new(ActivitySourceName); + + // ---------------- Deployment / driver-host coordination ---------------- + + /// Incremented every time DriverHostActor finishes applying a deployment (Ack or Reject). + public static readonly Counter DeploymentApplied = + Meter.CreateCounter("otopcua.deploy.applied", unit: "{deployment}", + description: "Deployments applied by a driver-role node (outcome=ack|reject)."); + + /// Time from DriverHostActor receiving DispatchDeployment to emitting the ack/reject. + public static readonly Histogram DeploymentApplyDurationSec = + Meter.CreateHistogram("otopcua.deploy.apply.duration", unit: "s", + description: "Driver-role apply latency from DispatchDeployment → Ack/Reject."); + + /// DriverInstanceActor spawn count (added=new instance; stop=disposed). + public static readonly Counter DriverInstanceLifecycle = + Meter.CreateCounter("otopcua.driver.lifecycle", unit: "{event}", + description: "DriverInstanceActor lifecycle transitions (event=spawn|stop|fault)."); + + // ---------------- VirtualTag / ScriptedAlarm engines ---------------- + + public static readonly Counter VirtualTagEval = + Meter.CreateCounter("otopcua.virtualtag.eval", unit: "{eval}", + description: "Virtual-tag evaluations attempted (outcome=ok|fail|skip)."); + + public static readonly Counter ScriptedAlarmTransition = + Meter.CreateCounter("otopcua.scriptedalarm.transition", unit: "{transition}", + description: "Scripted-alarm state transitions (state=active|acknowledged|inactive)."); + + // ---------------- OPC UA address-space + redundancy ---------------- + + public static readonly Counter OpcUaSinkWrite = + Meter.CreateCounter("otopcua.opcua.sink.write", unit: "{write}", + description: "Writes that landed in IOpcUaAddressSpaceSink (kind=value|alarm|rebuild)."); + + public static readonly Counter ServiceLevelChange = + Meter.CreateCounter("otopcua.redundancy.service_level_change", unit: "{change}", + description: "OPC UA Server.ServiceLevel transitions emitted by the redundancy state."); + + // ---------------- Convenience helpers ---------------- + + /// + /// Starts a deploy span tagged with the deployment id. Caller disposes to close. Returns + /// null when no listener is attached so the call site stays cheap on undecorated builds. + /// + public static Activity? StartDeployApplySpan(string deploymentId) + { + var activity = ActivitySource.StartActivity("otopcua.deploy.apply", ActivityKind.Internal); + activity?.SetTag("otopcua.deployment_id", deploymentId); + return activity; + } + + /// Span wrapping a full OPC UA address-space rebuild (Phase7 plan → apply). + public static Activity? StartAddressSpaceRebuildSpan() + => ActivitySource.StartActivity("otopcua.opcua.address_space_rebuild", ActivityKind.Internal); +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Observability/ObservabilityExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Observability/ObservabilityExtensions.cs new file mode 100644 index 0000000..e1e6c7e --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Observability/ObservabilityExtensions.cs @@ -0,0 +1,38 @@ +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; + +namespace ZB.MOM.WW.OtOpcUa.Host.Observability; + +/// +/// Wires the OtOpcUa Meter + ActivitySource into OpenTelemetry and exposes a Prometheus +/// scrape endpoint at /metrics on the host pipeline. F13d slice — only the meter + +/// activity source declared in are surfaced; per-Akka +/// internals + ASP.NET request metrics stay off by default to keep the scrape payload +/// scoped to OtOpcUa-owned signals. +/// +public static class ObservabilityExtensions +{ + public static IServiceCollection AddOtOpcUaObservability(this IServiceCollection services) + { + services.AddOpenTelemetry() + .WithMetrics(b => b + .AddMeter(OtOpcUaTelemetry.MeterName) + .AddPrometheusExporter()) + .WithTracing(b => b + .AddSource(OtOpcUaTelemetry.ActivitySourceName)); + + return services; + } + + /// + /// Mounts the Prometheus scrape endpoint on the existing ASP.NET pipeline. Call after + /// app.UseAuthentication/UseAuthorization if metrics access should require auth; + /// the default leaves it unauthenticated for local Prometheus scrapes. + /// + public static IEndpointRouteBuilder MapOtOpcUaMetrics(this IEndpointRouteBuilder app) + { + app.MapPrometheusScrapingEndpoint("/metrics"); + return app; + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs index e20384f..8452a3c 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs @@ -11,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Host; using ZB.MOM.WW.OtOpcUa.Host.Drivers; using ZB.MOM.WW.OtOpcUa.Host.Health; +using ZB.MOM.WW.OtOpcUa.Host.Observability; using ZB.MOM.WW.OtOpcUa.Host.OpcUa; using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security; using ZB.MOM.WW.OtOpcUa.Runtime; @@ -94,6 +95,7 @@ if (hasAdmin) } builder.Services.AddOtOpcUaHealth(); +builder.Services.AddOtOpcUaObservability(); var app = builder.Build(); app.UseSerilogRequestLogging(); @@ -109,6 +111,7 @@ if (hasAdmin) } app.MapOtOpcUaHealth(); +app.MapOtOpcUaMetrics(); Log.Information("OtOpcUa.Host starting with roles=[{Roles}] (admin={HasAdmin}, driver={HasDriver})", string.Join(",", roles), hasAdmin, hasDriver); diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/ZB.MOM.WW.OtOpcUa.Host.csproj b/src/Server/ZB.MOM.WW.OtOpcUa.Host/ZB.MOM.WW.OtOpcUa.Host.csproj index ac72232..be53447 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/ZB.MOM.WW.OtOpcUa.Host.csproj +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/ZB.MOM.WW.OtOpcUa.Host.csproj @@ -15,6 +15,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 1934ccd..55798ec 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; @@ -5,6 +6,7 @@ using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; @@ -239,6 +241,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _applyingDeploymentId = deploymentId; Become(Applying); + using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString()); + span?.SetTag("otopcua.node_id", _localNode.ToString()); + span?.SetTag("otopcua.revision", revision.ToString()); + span?.SetTag("otopcua.correlation_id", correlation.ToString()); + var sw = Stopwatch.StartNew(); + // Persist Applying row (idempotent on PK). UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null); @@ -252,6 +260,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers // composition. The publish actor handles the load-compose-diff-apply pipeline; we // just forward the same correlation id so the audit trail joins up. _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation)); + OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "ack")); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); } @@ -259,10 +268,13 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers { UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message); SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation); + OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "reject")); + span?.SetStatus(ActivityStatusCode.Error, ex.Message); _log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId); } finally { + OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds); _applyingDeploymentId = null; Become(Steady); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index f3185c8..f8dbcd4 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -1,5 +1,6 @@ using Akka.Actor; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; @@ -82,6 +83,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers _driver = driver; _driverInstanceId = driver.DriverInstanceId; _reconnectInterval = reconnectInterval; + OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, + new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), + new KeyValuePair("driver_type", driver.DriverType)); if (startStubbed) { Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}", @@ -314,5 +318,8 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers DetachSubscription(); try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); } + OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, + new KeyValuePair("event", "stop"), + new KeyValuePair("driver_type", _driver.DriverType)); } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 2c24826..1ef9ae7 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -3,6 +3,7 @@ using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; @@ -124,6 +125,7 @@ public sealed class OpcUaPublishActor : ReceiveActor { _sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc); Interlocked.Increment(ref _writes); + OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "value")); } catch (Exception ex) { @@ -137,6 +139,7 @@ public sealed class OpcUaPublishActor : ReceiveActor { _sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc); Interlocked.Increment(ref _writes); + OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "alarm")); } catch (Exception ex) { @@ -146,12 +149,19 @@ public sealed class OpcUaPublishActor : ReceiveActor private void HandleRebuild(RebuildAddressSpace msg) { + using var span = OtOpcUaTelemetry.StartAddressSpaceRebuildSpan(); + span?.SetTag("otopcua.correlation_id", msg.Correlation.ToString()); + // Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against // the latest deployment artifact. Without them, fall back to a raw sink rebuild — the // F10b/dev path before the integration completes. if (_dbFactory is null || _applier is null) { - try { _sink.RebuildAddressSpace(); } + try + { + _sink.RebuildAddressSpace(); + OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild")); + } catch (Exception ex) { _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", @@ -175,6 +185,7 @@ public sealed class OpcUaPublishActor : ReceiveActor var outcome = _applier.Apply(plan); _lastApplied = composition; + OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild")); _log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})", msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled); } @@ -211,6 +222,8 @@ public sealed class OpcUaPublishActor : ReceiveActor try { _serviceLevel.Publish(msg.ServiceLevel); + OtOpcUaTelemetry.ServiceLevelChange.Add(1, + new KeyValuePair("level", msg.ServiceLevel)); _log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel); } catch (Exception ex) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs index f6f2e53..77c2f85 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs @@ -4,6 +4,7 @@ using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; @@ -173,6 +174,9 @@ public sealed class ScriptedAlarmActor : ReceiveActor _ => next.ToString(), }; + OtOpcUaTelemetry.ScriptedAlarmTransition.Add(1, + new KeyValuePair("state", kind.ToLowerInvariant())); + var evt = new AlarmTransitionEvent( AlarmId: _config.AlarmId, EquipmentPath: _config.EquipmentPath, diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs index 01194a8..02b8b8b 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs @@ -3,6 +3,7 @@ using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.Types; namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; @@ -95,24 +96,35 @@ public sealed class VirtualTagActor : ReceiveActor catch (Exception ex) { _log.Warning(ex, "VirtualTag {Id}: evaluator threw", _virtualTagId); + OtOpcUaTelemetry.VirtualTagEval.Add(1, new KeyValuePair("outcome", "fail")); PublishLog("Error", $"evaluator threw: {ex.Message}"); return; } if (!result.Success) { + OtOpcUaTelemetry.VirtualTagEval.Add(1, new KeyValuePair("outcome", "fail")); PublishLog("Warning", result.Reason ?? "evaluator failure"); return; } // Skip no-change results. Real evaluator returns Ok(value); Null returns NoChange — both // safe because Null never produces a fresh value. - if (ReferenceEquals(result, VirtualTagEvalResult.NoChange)) return; + if (ReferenceEquals(result, VirtualTagEvalResult.NoChange)) + { + OtOpcUaTelemetry.VirtualTagEval.Add(1, new KeyValuePair("outcome", "skip")); + return; + } - if (_hasLastValue && Equals(_lastValue, result.Value)) return; + if (_hasLastValue && Equals(_lastValue, result.Value)) + { + OtOpcUaTelemetry.VirtualTagEval.Add(1, new KeyValuePair("outcome", "skip")); + return; + } _hasLastValue = true; _lastValue = result.Value; + OtOpcUaTelemetry.VirtualTagEval.Add(1, new KeyValuePair("outcome", "ok")); var evalResult = new EvaluationResult(_virtualTagId, result.Value, msg.TimestampUtc, CorrelationId.NewId()); Context.Parent.Tell(evalResult); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Observability/OtOpcUaTelemetryHookTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Observability/OtOpcUaTelemetryHookTests.cs new file mode 100644 index 0000000..462d609 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Observability/OtOpcUaTelemetryHookTests.cs @@ -0,0 +1,177 @@ +using System.Diagnostics; +using System.Diagnostics.Metrics; +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Observability; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Observability; + +/// +/// F13d — verifies the instrumentation sites actually emit on the central +/// meter + activity source. Each test attaches a one-shot +/// listener, exercises the instrumented path, then asserts the recorded measurement matches. +/// +public sealed class OtOpcUaTelemetryHookTests : RuntimeActorTestBase +{ + [Fact] + public void VirtualTagActor_evaluation_emits_otopcua_virtualtag_eval_counter() + { + using var recorder = new MeterRecorder("otopcua.virtualtag.eval"); + var parent = CreateTestProbe(); + var evaluator = new ConstEval(42); + + var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-tel-1", "expr", evaluator: evaluator)); + actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow)); + parent.ExpectMsg(); + + recorder.Total.ShouldBeGreaterThanOrEqualTo(1); + recorder.WithTag("outcome", "ok").ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public void OpcUaPublishActor_AttributeValueUpdate_emits_sink_write_counter() + { + using var recorder = new MeterRecorder("otopcua.opcua.sink.write"); + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + serviceLevel: NullServiceLevelPublisher.Instance, + subscribeRedundancyTopic: false, + localNode: Commons.Types.NodeId.Parse("test-node"))); + + actor.Tell(new OpcUaPublishActor.AttributeValueUpdate( + NodeId: "ns=2;s=tag-1", + Value: 42, + Quality: OpcUaQuality.Good, + TimestampUtc: DateTime.UtcNow)); + + AwaitAssertion(() => + { + recorder.Total.ShouldBeGreaterThanOrEqualTo(1); + recorder.WithTag("kind", "value").ShouldBeGreaterThanOrEqualTo(1); + }); + } + + [Fact] + public void RebuildAddressSpace_starts_an_address_space_rebuild_span() + { + using var spanRecorder = new ActivityRecorder("otopcua.opcua.address_space_rebuild"); + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + serviceLevel: NullServiceLevelPublisher.Instance, + subscribeRedundancyTopic: false, + localNode: Commons.Types.NodeId.Parse("test-node"))); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(Commons.Types.CorrelationId.NewId())); + + AwaitAssertion(() => spanRecorder.Activities.ShouldContain(a => a.OperationName == "otopcua.opcua.address_space_rebuild")); + } + + private void AwaitAssertion(Action assertion) + { + var deadline = DateTime.UtcNow.AddSeconds(2); + Exception? last = null; + while (DateTime.UtcNow < deadline) + { + try { assertion(); return; } + catch (Exception ex) { last = ex; Thread.Sleep(25); } + } + if (last is not null) throw last; + } + + /// Listens to a single instrument by name and tallies the values + tags. + private sealed class MeterRecorder : IDisposable + { + private readonly string _name; + private readonly MeterListener _listener; + private long _total; + private readonly List[]> _tagSets = new(); + private readonly object _gate = new(); + + public MeterRecorder(string instrumentName) + { + _name = instrumentName; + _listener = new MeterListener + { + InstrumentPublished = (instrument, listener) => + { + if (instrument.Meter.Name == OtOpcUaTelemetry.MeterName && instrument.Name == _name) + listener.EnableMeasurementEvents(instrument); + } + }; + _listener.SetMeasurementEventCallback((_, value, tags, _) => + { + lock (_gate) + { + _total += value; + _tagSets.Add(tags.ToArray()); + } + }); + _listener.Start(); + } + + public long Total { get { lock (_gate) return _total; } } + + public int WithTag(string key, string value) + { + lock (_gate) + { + return _tagSets.Count(set => set.Any(t => t.Key == key && Equals(t.Value, value))); + } + } + + public void Dispose() => _listener.Dispose(); + } + + /// Listens to a single ActivitySource by name and stores started Activities. + private sealed class ActivityRecorder : IDisposable + { + private readonly string _operationName; + private readonly ActivityListener _listener; + private readonly List _activities = new(); + private readonly object _gate = new(); + + public ActivityRecorder(string operationName) + { + _operationName = operationName; + _listener = new ActivityListener + { + ShouldListenTo = source => source.Name == OtOpcUaTelemetry.ActivitySourceName, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + ActivityStarted = activity => + { + if (activity.OperationName == _operationName) + { + lock (_gate) _activities.Add(activity); + } + } + }; + ActivitySource.AddActivityListener(_listener); + } + + public IReadOnlyList Activities { get { lock (_gate) return _activities.ToArray(); } } + + public void Dispose() => _listener.Dispose(); + } + + private sealed class ConstEval(object? value) : IVirtualTagEvaluator + { + public VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary dependencies) + => VirtualTagEvalResult.Ok(value); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public int Writes { get; private set; } + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) => Writes++; + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime occurredUtc) => Writes++; + public void RebuildAddressSpace() { /* recorded via span */ } + } +}