From 619207e7f532c63e6621c7fde77b034d6fd11212 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 29 Apr 2026 16:36:47 -0400 Subject: [PATCH] =?UTF-8?q?PR=206.1=20=E2=80=94=20OpenTelemetry=20traces?= =?UTF-8?q?=20around=20gw=20calls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-box ActivitySource ("ZB.MOM.WW.OtOpcUa.Driver.Galaxy") wrapped around the three gw-facing seams via decorators: - TracedGalaxySubscriber — galaxy.subscribe_bulk / galaxy.unsubscribe_bulk / galaxy.stream_events spans. Stream span covers the entire stream lifetime with a galaxy.event_count tag (per-event spans would dominate the trace volume at 50k tags / 1Hz; PR 6.2 owns per-event metrics). - TracedGalaxyDataWriter — galaxy.write spans tagged with galaxy.tag_count, galaxy.secured_write_count (split between FreeAccess /Operate vs Tune/Configure/VerifiedWrite, computed only when a listener is recording so the hot path stays free), galaxy.success_count. - TracedGalaxyHierarchySource — galaxy.get_hierarchy spans tagged with galaxy.object_count. GalaxyDriver.BuildProductionRuntimeAsync wraps the production seams in the decorators. The driver itself doesn't take an OpenTelemetry package dependency — System.Diagnostics.ActivitySource is in-box; the host process picks the listener. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Browse/TracedGalaxyHierarchySource.cs | 30 +++ .../GalaxyDriver.cs | 15 +- .../Runtime/GalaxyTelemetry.cs | 35 ++++ .../Runtime/TracedGalaxyDataWriter.cs | 54 ++++++ .../Runtime/TracedGalaxySubscriber.cs | 91 +++++++++ .../Runtime/GalaxyTelemetryTests.cs | 179 ++++++++++++++++++ 6 files changed, 401 insertions(+), 3 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/TracedGalaxyHierarchySource.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyTelemetry.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxyDataWriter.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxySubscriber.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyTelemetryTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/TracedGalaxyHierarchySource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/TracedGalaxyHierarchySource.cs new file mode 100644 index 0000000..fc641ad --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/TracedGalaxyHierarchySource.cs @@ -0,0 +1,30 @@ +using MxGateway.Contracts.Proto.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +/// +/// PR 6.1 — Decorator that emits one span +/// per GetHierarchy RPC. galaxy.object_count on the span lets ops +/// correlate slow Discover passes with Galaxy size without instrumenting the +/// discoverer's translation step. +/// +internal sealed class TracedGalaxyHierarchySource(IGalaxyHierarchySource inner, string clientName) : IGalaxyHierarchySource +{ + public async Task> GetHierarchyAsync(CancellationToken cancellationToken) + { + using var activity = GalaxyTelemetry.ActivitySource.StartActivity("galaxy.get_hierarchy"); + activity?.SetTag("galaxy.client", clientName); + try + { + var hierarchy = await inner.GetHierarchyAsync(cancellationToken).ConfigureAwait(false); + activity?.SetTag("galaxy.object_count", hierarchy.Count); + return hierarchy; + } + catch (Exception ex) + { + activity.RecordError(ex); + throw; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index cb8d6a7..42b44d4 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -185,8 +185,16 @@ public sealed class GalaxyDriver _ownedMxSession = new GalaxyMxSession(_options.MxAccess, _logger); await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false); - _subscriber = new GatewayGalaxySubscriber(_ownedMxSession); - _dataWriter = new GatewayGalaxyDataWriter(_ownedMxSession, _options.MxAccess.WriteUserId, _logger); + // PR 6.1 — wrap the gw-facing seams in tracing decorators so every Subscribe / + // Unsubscribe / Write / StreamEvents call emits a span on the + // "ZB.MOM.WW.OtOpcUa.Driver.Galaxy" ActivitySource. The host process's tracing + // listener (OTLP exporter, dotnet-trace, etc.) consumes these without the driver + // taking a dependency on the OpenTelemetry packages. + _subscriber = new TracedGalaxySubscriber( + new GatewayGalaxySubscriber(_ownedMxSession), _options.MxAccess.ClientName); + _dataWriter = new TracedGalaxyDataWriter( + new GatewayGalaxyDataWriter(_ownedMxSession, _options.MxAccess.WriteUserId, _logger), + _options.MxAccess.ClientName); _supervisor = new ReconnectSupervisor( reopen: ReopenAsync, @@ -559,7 +567,8 @@ public sealed class GalaxyDriver : null, }; _ownedRepositoryClient = GalaxyRepositoryClient.Create(clientOptions); - return new GatewayGalaxyHierarchySource(_ownedRepositoryClient); + return new TracedGalaxyHierarchySource( + new GatewayGalaxyHierarchySource(_ownedRepositoryClient), _options.MxAccess.ClientName); } public void Dispose() diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyTelemetry.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyTelemetry.cs new file mode 100644 index 0000000..ec11df6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyTelemetry.cs @@ -0,0 +1,35 @@ +using System.Diagnostics; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// PR 6.1 — In-box wired around every gw call the +/// driver makes (Subscribe/Unsubscribe, Write/WriteSecured, GetHierarchy). The +/// decorators in this folder produce one span per call, tagged with the inputs +/// ops needs to triage a slow or failing operation: +/// galaxy.tag_count, galaxy.success_count, galaxy.client. +/// +/// The driver itself doesn't take a dependency on the OpenTelemetry packages — +/// System.Diagnostics.ActivitySource is in the BCL. The host process +/// decides which listener (OTLP exporter, Application Insights, dotnet-trace) +/// subscribes to . +/// +/// +internal static class GalaxyTelemetry +{ + public const string ActivitySourceName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy"; + + public static readonly ActivitySource ActivitySource = new(ActivitySourceName); + + /// + /// Tag a span with a failure reason and set its status to Error. Helper + /// so the decorators don't repeat the four-line idiom on every catch block. + /// + public static void RecordError(this Activity? activity, Exception ex) + { + if (activity is null) return; + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.SetTag("exception.type", ex.GetType().FullName); + activity.SetTag("exception.message", ex.Message); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxyDataWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxyDataWriter.cs new file mode 100644 index 0000000..8bb2be7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxyDataWriter.cs @@ -0,0 +1,54 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// PR 6.1 — Decorator that emits one span +/// per gw write batch. Tags secured-write counts so ops can see the routing-by- +/// classification split (FreeAccess/Operate vs Tune/Configure) without re-reading +/// the discovery dictionary. +/// +internal sealed class TracedGalaxyDataWriter(IGalaxyDataWriter inner, string clientName) : IGalaxyDataWriter +{ + public async Task> WriteAsync( + IReadOnlyList writes, + Func securityResolver, + CancellationToken cancellationToken) + { + using var activity = GalaxyTelemetry.ActivitySource.StartActivity("galaxy.write"); + activity?.SetTag("galaxy.client", clientName); + activity?.SetTag("galaxy.tag_count", writes.Count); + + if (activity is { IsAllDataRequested: true }) + { + // Counting the secured-write split is cheap (one resolver call per request) + // and only happens when a tracing listener is actively recording — keeps the + // hot path free when no one's listening. + var securedCount = 0; + foreach (var w in writes) + { + var sc = securityResolver(w.FullReference); + if (sc is SecurityClassification.Tune + or SecurityClassification.Configure + or SecurityClassification.VerifiedWrite) + { + securedCount++; + } + } + activity.SetTag("galaxy.secured_write_count", securedCount); + } + + try + { + var results = await inner.WriteAsync(writes, securityResolver, cancellationToken) + .ConfigureAwait(false); + activity?.SetTag("galaxy.success_count", results.Count(r => r.StatusCode < 0x80000000u)); + return results; + } + catch (Exception ex) + { + activity.RecordError(ex); + throw; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxySubscriber.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxySubscriber.cs new file mode 100644 index 0000000..9cd0dcd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/TracedGalaxySubscriber.cs @@ -0,0 +1,91 @@ +using System.Runtime.CompilerServices; +using MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// PR 6.1 — Decorator that emits one span +/// per gw subscription RPC. Wraps the production ; +/// tests substitute a fake at the same seam without taking the tracing overhead. +/// +internal sealed class TracedGalaxySubscriber(IGalaxySubscriber inner, string clientName) : IGalaxySubscriber +{ + public async Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + { + using var activity = GalaxyTelemetry.ActivitySource.StartActivity("galaxy.subscribe_bulk"); + activity?.SetTag("galaxy.client", clientName); + activity?.SetTag("galaxy.tag_count", fullReferences.Count); + activity?.SetTag("galaxy.buffered_interval_ms", bufferedUpdateIntervalMs); + try + { + var results = await inner.SubscribeBulkAsync(fullReferences, bufferedUpdateIntervalMs, cancellationToken) + .ConfigureAwait(false); + activity?.SetTag("galaxy.success_count", results.Count(r => r.WasSuccessful)); + return results; + } + catch (Exception ex) + { + activity.RecordError(ex); + throw; + } + } + + public async Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + { + using var activity = GalaxyTelemetry.ActivitySource.StartActivity("galaxy.unsubscribe_bulk"); + activity?.SetTag("galaxy.client", clientName); + activity?.SetTag("galaxy.tag_count", itemHandles.Count); + try + { + await inner.UnsubscribeBulkAsync(itemHandles, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + activity.RecordError(ex); + throw; + } + } + + /// + /// Streaming RPC — one parent span covers the entire stream lifetime. Per-event + /// spans would dominate the trace volume at 50k tags / 1Hz; ops gets per-event + /// visibility through 's metrics in PR 6.2 instead. + /// + public async IAsyncEnumerable StreamEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + using var activity = GalaxyTelemetry.ActivitySource.StartActivity("galaxy.stream_events"); + activity?.SetTag("galaxy.client", clientName); + + IAsyncEnumerator? enumerator = null; + try + { + enumerator = inner.StreamEventsAsync(cancellationToken).GetAsyncEnumerator(cancellationToken); + var eventCount = 0L; + while (true) + { + bool moveNext; + try + { + moveNext = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + activity.RecordError(ex); + activity?.SetTag("galaxy.event_count", eventCount); + throw; + } + + if (!moveNext) break; + eventCount++; + yield return enumerator.Current; + } + activity?.SetTag("galaxy.event_count", eventCount); + } + finally + { + if (enumerator is not null) await enumerator.DisposeAsync().ConfigureAwait(false); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyTelemetryTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyTelemetryTests.cs new file mode 100644 index 0000000..7804e8d --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyTelemetryTests.cs @@ -0,0 +1,179 @@ +using System.Diagnostics; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// PR 6.1 — pins that every gw-facing call produces a span on the +/// ZB.MOM.WW.OtOpcUa.Driver.Galaxy ActivitySource. We listen via +/// rather than asserting on internal state, so the +/// tests double as documentation of the listener-side contract. +/// +public sealed class GalaxyTelemetryTests +{ + /// Subscribes an ActivityListener for the test, captures each spawned activity. + private static (ActivityListener Listener, List Captured) StartCapture() + { + var captured = new List(); + var listener = new ActivityListener + { + ShouldListenTo = src => src.Name == GalaxyTelemetry.ActivitySourceName, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => captured.Add(activity), + }; + ActivitySource.AddActivityListener(listener); + return (listener, captured); + } + + [Fact] + public async Task TracedGalaxySubscriber_emits_subscribe_bulk_span_with_tag_count() + { + var (listener, captured) = StartCapture(); + try + { + var inner = new FakeSubscriber(); + var sut = new TracedGalaxySubscriber(inner, "OtOpcUa-Test"); + await sut.SubscribeBulkAsync(["A", "B", "C"], 500, CancellationToken.None); + + var span = captured.ShouldHaveSingleItem(); + span.OperationName.ShouldBe("galaxy.subscribe_bulk"); + span.GetTagItem("galaxy.client").ShouldBe("OtOpcUa-Test"); + span.GetTagItem("galaxy.tag_count").ShouldBe(3); + span.GetTagItem("galaxy.buffered_interval_ms").ShouldBe(500); + span.GetTagItem("galaxy.success_count").ShouldBe(3); + } + finally { listener.Dispose(); } + } + + [Fact] + public async Task TracedGalaxySubscriber_records_error_and_rethrows_on_failure() + { + var (listener, captured) = StartCapture(); + try + { + var sut = new TracedGalaxySubscriber(new ThrowingSubscriber(), "OtOpcUa-Test"); + await Should.ThrowAsync(() => + sut.SubscribeBulkAsync(["A"], 0, CancellationToken.None)); + + var span = captured.ShouldHaveSingleItem(); + span.Status.ShouldBe(ActivityStatusCode.Error); + span.GetTagItem("exception.type").ShouldBe(typeof(InvalidOperationException).FullName); + } + finally { listener.Dispose(); } + } + + [Fact] + public async Task TracedGalaxyDataWriter_tags_secured_write_count() + { + var (listener, captured) = StartCapture(); + try + { + var inner = new RecordingWriter(); + var sut = new TracedGalaxyDataWriter(inner, "OtOpcUa-Test"); + + var requests = new[] + { + new WriteRequest("FreeTag", 1.0), + new WriteRequest("OperateTag", 2.0), + new WriteRequest("TuneTag", 3.0), + new WriteRequest("ConfigTag", 4.0), + }; + SecurityClassification Resolver(string fullRef) => fullRef switch + { + "FreeTag" => SecurityClassification.FreeAccess, + "OperateTag" => SecurityClassification.Operate, + "TuneTag" => SecurityClassification.Tune, + "ConfigTag" => SecurityClassification.Configure, + _ => SecurityClassification.FreeAccess, + }; + + await sut.WriteAsync(requests, Resolver, CancellationToken.None); + + var span = captured.ShouldHaveSingleItem(); + span.OperationName.ShouldBe("galaxy.write"); + span.GetTagItem("galaxy.tag_count").ShouldBe(4); + span.GetTagItem("galaxy.secured_write_count").ShouldBe(2); // Tune + Configure + } + finally { listener.Dispose(); } + } + + [Fact] + public async Task TracedGalaxyHierarchySource_tags_object_count() + { + var (listener, captured) = StartCapture(); + try + { + var sut = new TracedGalaxyHierarchySource(new FakeHierarchy(), "OtOpcUa-Test"); + var hierarchy = await sut.GetHierarchyAsync(CancellationToken.None); + hierarchy.Count.ShouldBe(2); + + var span = captured.ShouldHaveSingleItem(); + span.OperationName.ShouldBe("galaxy.get_hierarchy"); + span.GetTagItem("galaxy.object_count").ShouldBe(2); + } + finally { listener.Dispose(); } + } + + private sealed class FakeSubscriber : IGalaxySubscriber + { + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => Task.FromResult>( + fullReferences.Select((r, i) => new SubscribeResult + { + TagAddress = r, + ItemHandle = i + 1, + WasSuccessful = true, + }).ToList()); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public async IAsyncEnumerable StreamEventsAsync( + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + } + + private sealed class ThrowingSubscriber : IGalaxySubscriber + { + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => throw new InvalidOperationException("gw down"); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public async IAsyncEnumerable StreamEventsAsync( + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + } + + private sealed class RecordingWriter : IGalaxyDataWriter + { + public Task> WriteAsync( + IReadOnlyList writes, + Func securityResolver, + CancellationToken cancellationToken) + => Task.FromResult>( + writes.Select(_ => new WriteResult(0u)).ToList()); + } + + private sealed class FakeHierarchy : IGalaxyHierarchySource + { + public Task> GetHierarchyAsync( + CancellationToken cancellationToken) + => Task.FromResult>( + [new(), new()]); + } +}