560b327ee1
v2-ci / build (push) Failing after 33s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Imports the freshly-rebuilt ZB.MOM.WW.MxGateway.Client + ZB.MOM.WW.MxGateway.Contracts nupkgs (0.1.0) from /tmp/mxgw-dist. Replaces the vendored libs/ DLLs and the pre-restructure MxGateway.* namespaces across the runtime Galaxy driver, Galaxy.Browser, and their tests. Key changes: - nuget-packages/ added as a local feed via NuGet.config; .gitignore exempts it from the *.nupkg rule so the packages are tracked - Directory.Packages.props pins both packages at 0.1.0 - 4 csprojs swap <Reference HintPath="libs/...dll"/> for <PackageReference/> - 36 .cs files renamed `using MxGateway.*` -> `using ZB.MOM.WW.MxGateway.*` - libs/ removed (vendored DLLs + README.md) GalaxyBrowseSession rewritten around the new lazy API: - RootAsync calls GalaxyRepositoryClient.BrowseAsync (returns LazyBrowseNodes) and caches them by TagName instead of bulk-fetching the whole hierarchy - ExpandAsync looks up the cached LazyBrowseNode and calls its ExpandAsync, giving true one-wire-call-per-click instead of in-memory parent/child scan - _byGobjectId + _hasChildrenSet dropped (LazyBrowseNode carries HasChildrenHint) - AttributesAsync unchanged (already uses DiscoverHierarchyAsync MaxDepth=0) Tests: Galaxy.Tests 245/245, Galaxy.Browser.Tests 10/10, AdminUI.Tests 66/66. Pre-existing 12 solution errors unchanged (test sinks + Cli XML comments).
229 lines
10 KiB
C#
229 lines
10 KiB
C#
using System.Diagnostics.Metrics;
|
|
using System.Threading.Channels;
|
|
using Google.Protobuf.WellKnownTypes;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
|
|
|
/// <summary>
|
|
/// PR 6.2 — pins the EventPump's bounded-channel + drop-newest behavior. We
|
|
/// hold the dispatch loop with a slow handler so the channel fills, then verify
|
|
/// the producer keeps reading from the gw stream and increments the
|
|
/// <c>galaxy.events.dropped</c> counter rather than blocking.
|
|
/// </summary>
|
|
public sealed class EventPumpBoundedChannelTests
|
|
{
|
|
/// <summary>Verifies that the event pump drops newest events when the bounded channel fills and records metrics for dropped events.</summary>
|
|
[Fact]
|
|
public async Task Drops_newest_when_channel_fills_and_records_metric()
|
|
{
|
|
var counters = StartMeterCapture();
|
|
try
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var registry = new SubscriptionRegistry();
|
|
registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]);
|
|
|
|
// Tiny channel + slow handler ⇒ producer hits FullMode.Wait → TryWrite false
|
|
// for every overflow event.
|
|
var dispatchGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
await using var pump = new EventPump(
|
|
subscriber, registry, channelCapacity: 2, clientName: "PumpTest");
|
|
pump.OnDataChange += async (_, _) =>
|
|
{
|
|
// Block the dispatch loop until we've shoved enough events through to
|
|
// overflow the bounded channel. Consume the gate exactly once.
|
|
await dispatchGate.Task.ConfigureAwait(false);
|
|
};
|
|
pump.Start();
|
|
|
|
const int totalEvents = 10;
|
|
for (var i = 0; i < totalEvents; i++)
|
|
{
|
|
await subscriber.EmitAsync(itemHandle: 7, value: i);
|
|
}
|
|
// Give the producer a beat to run TryWrite for every event.
|
|
await Task.Delay(150);
|
|
|
|
// Capacity 2 + 1 in-flight in the dispatcher = 3 may have been accepted; the
|
|
// remainder should have hit the dropped counter. Don't pin exact counts —
|
|
// the scheduler can interleave; pin the invariants instead.
|
|
counters.Received.ShouldBeGreaterThanOrEqualTo(totalEvents);
|
|
counters.Dropped.ShouldBeGreaterThan(0,
|
|
"with capacity=2 and a held dispatcher we must drop at least one of 10 events");
|
|
(counters.Received).ShouldBe(counters.Dispatched + counters.Dropped + counters.InFlight,
|
|
"received = dispatched + dropped + (events still queued)");
|
|
|
|
// Release the dispatcher so DisposeAsync can drain cleanly.
|
|
dispatchGate.TrySetResult();
|
|
}
|
|
finally
|
|
{
|
|
counters.Dispose();
|
|
}
|
|
}
|
|
|
|
/// <summary>Verifies that the event pump throws an exception when the channel capacity is invalid.</summary>
|
|
[Fact]
|
|
public async Task Throws_when_channelCapacity_is_invalid()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var registry = new SubscriptionRegistry();
|
|
Should.Throw<ArgumentOutOfRangeException>(() =>
|
|
new EventPump(subscriber, registry, channelCapacity: 0));
|
|
Should.Throw<ArgumentOutOfRangeException>(() =>
|
|
new EventPump(subscriber, registry, channelCapacity: -1));
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>Verifies that event pump metrics are tagged with the client name for tracking multiple driver hosts.</summary>
|
|
[Fact]
|
|
public async Task Tags_metrics_with_client_name_for_multi_driver_hosts()
|
|
{
|
|
var captured = new List<(string Instrument, KeyValuePair<string, object?>[] Tags)>();
|
|
using var listener = new MeterListener();
|
|
listener.InstrumentPublished = (instr, l) =>
|
|
{
|
|
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
|
|
};
|
|
// The callback fires on the thread that calls Counter.Add() — that is the
|
|
// RunAsync background Task. Use lock(captured) everywhere to avoid torn reads.
|
|
listener.SetMeasurementEventCallback<long>((instr, _, tags, _) =>
|
|
{
|
|
lock (captured) { captured.Add((instr.Name, tags.ToArray())); }
|
|
});
|
|
listener.Start();
|
|
|
|
var subscriber = new ManualSubscriber();
|
|
var registry = new SubscriptionRegistry();
|
|
registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]);
|
|
|
|
await using (var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "Driver-X"))
|
|
{
|
|
pump.Start();
|
|
await subscriber.EmitAsync(7, 42.0);
|
|
|
|
// Poll until at least one galaxy.events.received measurement tagged
|
|
// galaxy.client=Driver-X lands in the listener, rather than using a
|
|
// fixed delay that races under parallel test load on a busy box.
|
|
var deadline = DateTime.UtcNow.AddSeconds(5);
|
|
bool found = false;
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
listener.RecordObservableInstruments();
|
|
bool hasMatch;
|
|
lock (captured)
|
|
{
|
|
hasMatch = captured.Any(c =>
|
|
c.Instrument == "galaxy.events.received" &&
|
|
c.Tags.Any(t => t.Key == "galaxy.client" &&
|
|
string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)));
|
|
}
|
|
if (hasMatch) { found = true; break; }
|
|
await Task.Delay(25);
|
|
}
|
|
_ = found; // assertion happens below after dispose
|
|
}
|
|
|
|
// The static Meter is shared across all EventPump instances in the test
|
|
// assembly; xUnit may run other pump tests in parallel and their
|
|
// measurements land on the same listener. Filter to our pump's tag value.
|
|
List<(string Instrument, KeyValuePair<string, object?>[] Tags)> ours;
|
|
lock (captured)
|
|
{
|
|
ours = captured
|
|
.Where(c => c.Tags.Any(t => t.Key == "galaxy.client"
|
|
&& string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)))
|
|
.ToList();
|
|
}
|
|
|
|
ours.ShouldNotBeEmpty(
|
|
"at least one measurement from this test's pump must carry galaxy.client=Driver-X");
|
|
ours.ShouldContain(c => c.Instrument == "galaxy.events.received");
|
|
}
|
|
|
|
private static CounterCapture StartMeterCapture()
|
|
{
|
|
var capture = new CounterCapture();
|
|
var listener = new MeterListener();
|
|
listener.InstrumentPublished = (instr, l) =>
|
|
{
|
|
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
|
|
};
|
|
listener.SetMeasurementEventCallback<long>((instr, value, _, _) =>
|
|
{
|
|
switch (instr.Name)
|
|
{
|
|
case "galaxy.events.received": Interlocked.Add(ref capture._received, value); break;
|
|
case "galaxy.events.dispatched": Interlocked.Add(ref capture._dispatched, value); break;
|
|
case "galaxy.events.dropped": Interlocked.Add(ref capture._dropped, value); break;
|
|
}
|
|
});
|
|
listener.Start();
|
|
capture.Listener = listener;
|
|
return capture;
|
|
}
|
|
|
|
private sealed class CounterCapture : IDisposable
|
|
{
|
|
public MeterListener? Listener;
|
|
internal long _received, _dispatched, _dropped;
|
|
/// <summary>Gets the count of received events.</summary>
|
|
public long Received => Interlocked.Read(ref _received);
|
|
/// <summary>Gets the count of dispatched events.</summary>
|
|
public long Dispatched => Interlocked.Read(ref _dispatched);
|
|
/// <summary>Gets the count of dropped events.</summary>
|
|
public long Dropped => Interlocked.Read(ref _dropped);
|
|
/// <summary>Gets the count of in-flight events.</summary>
|
|
public long InFlight => Math.Max(0, Received - Dispatched - Dropped);
|
|
/// <summary>Disposes the meter listener.</summary>
|
|
public void Dispose() => Listener?.Dispose();
|
|
}
|
|
|
|
private sealed class ManualSubscriber : IGalaxySubscriber
|
|
{
|
|
private readonly Channel<MxEvent> _stream =
|
|
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
|
|
|
/// <summary>Subscribes to a bulk list of tag references.</summary>
|
|
/// <param name="fullReferences">The list of full references to subscribe to.</param>
|
|
/// <param name="bufferedUpdateIntervalMs">The buffered update interval in milliseconds.</param>
|
|
/// <param name="cancellationToken">The cancellation token.</param>
|
|
/// <returns>An empty result list.</returns>
|
|
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
|
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
|
=> Task.FromResult<IReadOnlyList<SubscribeResult>>([]);
|
|
|
|
/// <summary>Unsubscribes from a bulk list of item handles.</summary>
|
|
/// <param name="itemHandles">The list of item handles to unsubscribe from.</param>
|
|
/// <param name="cancellationToken">The cancellation token.</param>
|
|
/// <returns>A completed task.</returns>
|
|
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
|
=> Task.CompletedTask;
|
|
|
|
/// <summary>Streams events asynchronously.</summary>
|
|
/// <param name="cancellationToken">The cancellation token.</param>
|
|
/// <returns>An async enumerable of MxEvent objects.</returns>
|
|
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
|
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
|
|
|
/// <summary>Emits a test event with the specified item handle and value.</summary>
|
|
/// <param name="itemHandle">The item handle.</param>
|
|
/// <param name="value">The event value.</param>
|
|
/// <returns>A completed value task.</returns>
|
|
public ValueTask EmitAsync(int itemHandle, double value) =>
|
|
_stream.Writer.WriteAsync(new MxEvent
|
|
{
|
|
Family = MxEventFamily.OnDataChange,
|
|
ItemHandle = itemHandle,
|
|
Value = new MxValue { DoubleValue = value },
|
|
Quality = 192,
|
|
SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
|
});
|
|
}
|
|
}
|