PR 6.1 — OpenTelemetry traces around gw calls
In-box ActivitySource ("ZB.MOM.WW.OtOpcUa.Driver.Galaxy") wrapped around
the three gw-facing seams via decorators:
- TracedGalaxySubscriber — galaxy.subscribe_bulk / galaxy.unsubscribe_bulk
/ galaxy.stream_events spans. Stream span covers the entire stream
lifetime with a galaxy.event_count tag (per-event spans would dominate
the trace volume at 50k tags / 1Hz; PR 6.2 owns per-event metrics).
- TracedGalaxyDataWriter — galaxy.write spans tagged with
galaxy.tag_count, galaxy.secured_write_count (split between FreeAccess
/Operate vs Tune/Configure/VerifiedWrite, computed only when a listener
is recording so the hot path stays free), galaxy.success_count.
- TracedGalaxyHierarchySource — galaxy.get_hierarchy spans tagged with
galaxy.object_count.
GalaxyDriver.BuildProductionRuntimeAsync wraps the production seams in
the decorators. The driver itself doesn't take an OpenTelemetry package
dependency — System.Diagnostics.ActivitySource is in-box; the host
process picks the listener.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,179 @@
|
||||
using System.Diagnostics;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// PR 6.1 — pins that every gw-facing call produces a span on the
|
||||
/// <c>ZB.MOM.WW.OtOpcUa.Driver.Galaxy</c> ActivitySource. We listen via
|
||||
/// <see cref="ActivityListener"/> rather than asserting on internal state, so the
|
||||
/// tests double as documentation of the listener-side contract.
|
||||
/// </summary>
|
||||
public sealed class GalaxyTelemetryTests
|
||||
{
|
||||
/// <summary>Subscribes an ActivityListener for the test, captures each spawned activity.</summary>
|
||||
private static (ActivityListener Listener, List<Activity> Captured) StartCapture()
|
||||
{
|
||||
var captured = new List<Activity>();
|
||||
var listener = new ActivityListener
|
||||
{
|
||||
ShouldListenTo = src => src.Name == GalaxyTelemetry.ActivitySourceName,
|
||||
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
|
||||
ActivityStopped = activity => captured.Add(activity),
|
||||
};
|
||||
ActivitySource.AddActivityListener(listener);
|
||||
return (listener, captured);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TracedGalaxySubscriber_emits_subscribe_bulk_span_with_tag_count()
|
||||
{
|
||||
var (listener, captured) = StartCapture();
|
||||
try
|
||||
{
|
||||
var inner = new FakeSubscriber();
|
||||
var sut = new TracedGalaxySubscriber(inner, "OtOpcUa-Test");
|
||||
await sut.SubscribeBulkAsync(["A", "B", "C"], 500, CancellationToken.None);
|
||||
|
||||
var span = captured.ShouldHaveSingleItem();
|
||||
span.OperationName.ShouldBe("galaxy.subscribe_bulk");
|
||||
span.GetTagItem("galaxy.client").ShouldBe("OtOpcUa-Test");
|
||||
span.GetTagItem("galaxy.tag_count").ShouldBe(3);
|
||||
span.GetTagItem("galaxy.buffered_interval_ms").ShouldBe(500);
|
||||
span.GetTagItem("galaxy.success_count").ShouldBe(3);
|
||||
}
|
||||
finally { listener.Dispose(); }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TracedGalaxySubscriber_records_error_and_rethrows_on_failure()
|
||||
{
|
||||
var (listener, captured) = StartCapture();
|
||||
try
|
||||
{
|
||||
var sut = new TracedGalaxySubscriber(new ThrowingSubscriber(), "OtOpcUa-Test");
|
||||
await Should.ThrowAsync<InvalidOperationException>(() =>
|
||||
sut.SubscribeBulkAsync(["A"], 0, CancellationToken.None));
|
||||
|
||||
var span = captured.ShouldHaveSingleItem();
|
||||
span.Status.ShouldBe(ActivityStatusCode.Error);
|
||||
span.GetTagItem("exception.type").ShouldBe(typeof(InvalidOperationException).FullName);
|
||||
}
|
||||
finally { listener.Dispose(); }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TracedGalaxyDataWriter_tags_secured_write_count()
|
||||
{
|
||||
var (listener, captured) = StartCapture();
|
||||
try
|
||||
{
|
||||
var inner = new RecordingWriter();
|
||||
var sut = new TracedGalaxyDataWriter(inner, "OtOpcUa-Test");
|
||||
|
||||
var requests = new[]
|
||||
{
|
||||
new WriteRequest("FreeTag", 1.0),
|
||||
new WriteRequest("OperateTag", 2.0),
|
||||
new WriteRequest("TuneTag", 3.0),
|
||||
new WriteRequest("ConfigTag", 4.0),
|
||||
};
|
||||
SecurityClassification Resolver(string fullRef) => fullRef switch
|
||||
{
|
||||
"FreeTag" => SecurityClassification.FreeAccess,
|
||||
"OperateTag" => SecurityClassification.Operate,
|
||||
"TuneTag" => SecurityClassification.Tune,
|
||||
"ConfigTag" => SecurityClassification.Configure,
|
||||
_ => SecurityClassification.FreeAccess,
|
||||
};
|
||||
|
||||
await sut.WriteAsync(requests, Resolver, CancellationToken.None);
|
||||
|
||||
var span = captured.ShouldHaveSingleItem();
|
||||
span.OperationName.ShouldBe("galaxy.write");
|
||||
span.GetTagItem("galaxy.tag_count").ShouldBe(4);
|
||||
span.GetTagItem("galaxy.secured_write_count").ShouldBe(2); // Tune + Configure
|
||||
}
|
||||
finally { listener.Dispose(); }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TracedGalaxyHierarchySource_tags_object_count()
|
||||
{
|
||||
var (listener, captured) = StartCapture();
|
||||
try
|
||||
{
|
||||
var sut = new TracedGalaxyHierarchySource(new FakeHierarchy(), "OtOpcUa-Test");
|
||||
var hierarchy = await sut.GetHierarchyAsync(CancellationToken.None);
|
||||
hierarchy.Count.ShouldBe(2);
|
||||
|
||||
var span = captured.ShouldHaveSingleItem();
|
||||
span.OperationName.ShouldBe("galaxy.get_hierarchy");
|
||||
span.GetTagItem("galaxy.object_count").ShouldBe(2);
|
||||
}
|
||||
finally { listener.Dispose(); }
|
||||
}
|
||||
|
||||
private sealed class FakeSubscriber : IGalaxySubscriber
|
||||
{
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
=> Task.FromResult<IReadOnlyList<SubscribeResult>>(
|
||||
fullReferences.Select((r, i) => new SubscribeResult
|
||||
{
|
||||
TagAddress = r,
|
||||
ItemHandle = i + 1,
|
||||
WasSuccessful = true,
|
||||
}).ToList());
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ThrowingSubscriber : IGalaxySubscriber
|
||||
{
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
=> throw new InvalidOperationException("gw down");
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingWriter : IGalaxyDataWriter
|
||||
{
|
||||
public Task<IReadOnlyList<WriteResult>> WriteAsync(
|
||||
IReadOnlyList<WriteRequest> writes,
|
||||
Func<string, SecurityClassification> securityResolver,
|
||||
CancellationToken cancellationToken)
|
||||
=> Task.FromResult<IReadOnlyList<WriteResult>>(
|
||||
writes.Select(_ => new WriteResult(0u)).ToList());
|
||||
}
|
||||
|
||||
private sealed class FakeHierarchy : IGalaxyHierarchySource
|
||||
{
|
||||
public Task<IReadOnlyList<MxGateway.Contracts.Proto.Galaxy.GalaxyObject>> GetHierarchyAsync(
|
||||
CancellationToken cancellationToken)
|
||||
=> Task.FromResult<IReadOnlyList<MxGateway.Contracts.Proto.Galaxy.GalaxyObject>>(
|
||||
[new(), new()]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user