using System.Diagnostics; using System.Diagnostics.Metrics; using Akka.Actor; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Observability; /// /// F13d — verifies the instrumentation sites actually emit on the central /// meter + activity source. Each test attaches a one-shot /// listener, exercises the instrumented path, then asserts the recorded measurement matches. /// public sealed class OtOpcUaTelemetryHookTests : RuntimeActorTestBase { [Fact] public void VirtualTagActor_evaluation_emits_otopcua_virtualtag_eval_counter() { using var recorder = new MeterRecorder("otopcua.virtualtag.eval"); var parent = CreateTestProbe(); var evaluator = new ConstEval(42); var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-tel-1", "expr", evaluator: evaluator)); actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow)); parent.ExpectMsg(); recorder.Total.ShouldBeGreaterThanOrEqualTo(1); recorder.WithTag("outcome", "ok").ShouldBeGreaterThanOrEqualTo(1); } [Fact] public void OpcUaPublishActor_AttributeValueUpdate_emits_sink_write_counter() { using var recorder = new MeterRecorder("otopcua.opcua.sink.write"); var sink = new RecordingSink(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( sink: sink, serviceLevel: NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: false, localNode: Commons.Types.NodeId.Parse("test-node"))); actor.Tell(new OpcUaPublishActor.AttributeValueUpdate( NodeId: "ns=2;s=tag-1", Value: 42, Quality: OpcUaQuality.Good, TimestampUtc: DateTime.UtcNow)); AwaitAssertion(() => { recorder.Total.ShouldBeGreaterThanOrEqualTo(1); recorder.WithTag("kind", "value").ShouldBeGreaterThanOrEqualTo(1); }); } [Fact] public void RebuildAddressSpace_starts_an_address_space_rebuild_span() { using var spanRecorder = new ActivityRecorder("otopcua.opcua.address_space_rebuild"); var sink = new RecordingSink(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( sink: sink, serviceLevel: NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: false, localNode: Commons.Types.NodeId.Parse("test-node"))); actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(Commons.Types.CorrelationId.NewId())); AwaitAssertion(() => spanRecorder.Activities.ShouldContain(a => a.OperationName == "otopcua.opcua.address_space_rebuild")); } private void AwaitAssertion(Action assertion) { var deadline = DateTime.UtcNow.AddSeconds(2); Exception? last = null; while (DateTime.UtcNow < deadline) { try { assertion(); return; } catch (Exception ex) { last = ex; Thread.Sleep(25); } } if (last is not null) throw last; } /// Listens to a single instrument by name and tallies the values + tags. private sealed class MeterRecorder : IDisposable { private readonly string _name; private readonly MeterListener _listener; private long _total; private readonly List[]> _tagSets = new(); private readonly object _gate = new(); public MeterRecorder(string instrumentName) { _name = instrumentName; _listener = new MeterListener { InstrumentPublished = (instrument, listener) => { if (instrument.Meter.Name == OtOpcUaTelemetry.MeterName && instrument.Name == _name) listener.EnableMeasurementEvents(instrument); } }; _listener.SetMeasurementEventCallback((_, value, tags, _) => { lock (_gate) { _total += value; _tagSets.Add(tags.ToArray()); } }); _listener.Start(); } public long Total { get { lock (_gate) return _total; } } public int WithTag(string key, string value) { lock (_gate) { return _tagSets.Count(set => set.Any(t => t.Key == key && Equals(t.Value, value))); } } public void Dispose() => _listener.Dispose(); } /// Listens to a single ActivitySource by name and stores started Activities. private sealed class ActivityRecorder : IDisposable { private readonly string _operationName; private readonly ActivityListener _listener; private readonly List _activities = new(); private readonly object _gate = new(); public ActivityRecorder(string operationName) { _operationName = operationName; _listener = new ActivityListener { ShouldListenTo = source => source.Name == OtOpcUaTelemetry.ActivitySourceName, Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, ActivityStarted = activity => { if (activity.OperationName == _operationName) { lock (_gate) _activities.Add(activity); } } }; ActivitySource.AddActivityListener(_listener); } public IReadOnlyList Activities { get { lock (_gate) return _activities.ToArray(); } } public void Dispose() => _listener.Dispose(); } private sealed class ConstEval(object? value) : IVirtualTagEvaluator { public VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary dependencies) => VirtualTagEvalResult.Ok(value); } private sealed class RecordingSink : IOpcUaAddressSpaceSink { public int Writes { get; private set; } public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) => Writes++; public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime occurredUtc) => Writes++; public void RebuildAddressSpace() { /* recorded via span */ } } }