using Xunit; using ZB.MOM.WW.LmxProxy.Client.Domain; using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; namespace ZB.MOM.WW.LmxProxy.Client.Tests; public class StreamingExtensionsTests { [Fact] public async Task ReadStreamAsync_BatchesCorrectly() { var fake = new FakeLmxProxyClient(); var addresses = Enumerable.Range(0, 250).Select(i => $"tag{i}").ToList(); var results = new List>(); await foreach (var kvp in fake.ReadStreamAsync(addresses, batchSize: 100)) { results.Add(kvp); } // 250 tags at batchSize=100 => 3 batch calls (100, 100, 50) Assert.Equal(3, fake.ReadBatchCalls.Count); Assert.Equal(100, fake.ReadBatchCalls[0].Count); Assert.Equal(100, fake.ReadBatchCalls[1].Count); Assert.Equal(50, fake.ReadBatchCalls[2].Count); Assert.Equal(250, results.Count); } [Fact] public async Task ReadStreamAsync_RetriesOnError() { var fake = new FakeLmxProxyClient { ReadBatchExceptionToThrow = new InvalidOperationException("transient"), ReadBatchExceptionCount = 1 // First call throws, second succeeds }; var addresses = Enumerable.Range(0, 5).Select(i => $"tag{i}").ToList(); var results = new List>(); await foreach (var kvp in fake.ReadStreamAsync(addresses, batchSize: 10)) { results.Add(kvp); } // Should retry: first call throws, second succeeds Assert.Equal(2, fake.ReadBatchCalls.Count); Assert.Equal(5, results.Count); } [Fact] public async Task WriteStreamAsync_BatchesAndReturnsCount() { var fake = new FakeLmxProxyClient(); var values = GenerateWriteValues(250); int total = await fake.WriteStreamAsync(values, batchSize: 100); Assert.Equal(250, total); Assert.Equal(3, fake.WriteBatchCalls.Count); Assert.Equal(100, fake.WriteBatchCalls[0].Count); Assert.Equal(100, fake.WriteBatchCalls[1].Count); Assert.Equal(50, fake.WriteBatchCalls[2].Count); } [Fact] public async Task ProcessInParallelAsync_RespectsMaxConcurrency() { int maxConcurrency = 2; int currentConcurrency = 0; int maxObservedConcurrency = 0; var lockObj = new object(); var source = GenerateAsyncSequence(10); await source.ProcessInParallelAsync(async (item, ct) => { int current; lock (lockObj) { currentConcurrency++; current = currentConcurrency; if (current > maxObservedConcurrency) maxObservedConcurrency = current; } await Task.Delay(50, ct); lock (lockObj) { currentConcurrency--; } }, maxConcurrency: maxConcurrency); Assert.True(maxObservedConcurrency <= maxConcurrency, $"Max observed concurrency {maxObservedConcurrency} exceeded limit {maxConcurrency}"); } [Fact] public async Task SubscribeStreamAsync_YieldsFromChannel() { var fake = new FakeLmxProxyClient(); var addresses = new[] { "tag1", "tag2" }; using var cts = new CancellationTokenSource(); var results = new List<(string Tag, Vtq Vtq)>(); // Start the subscription stream in a background task var streamTask = Task.Run(async () => { await foreach (var item in fake.SubscribeStreamAsync(addresses, cts.Token)) { results.Add(item); if (results.Count >= 3) await cts.CancelAsync(); } }); // Wait for subscribe to be called with a polling loop for (int i = 0; i < 50 && fake.CapturedOnUpdate is null; i++) await Task.Delay(50); // Simulate updates via captured callback Assert.NotNull(fake.CapturedOnUpdate); fake.CapturedOnUpdate!("tag1", new Vtq(1.0, DateTime.UtcNow, Quality.Good)); fake.CapturedOnUpdate!("tag2", new Vtq(2.0, DateTime.UtcNow, Quality.Good)); fake.CapturedOnUpdate!("tag1", new Vtq(3.0, DateTime.UtcNow, Quality.Good)); // Wait for stream task to complete (cancelled after 3 items) try { await streamTask; } catch (OperationCanceledException) { } Assert.Equal(3, results.Count); Assert.Equal("tag1", results[0].Tag); Assert.Equal("tag2", results[1].Tag); Assert.Equal("tag1", results[2].Tag); } private static async IAsyncEnumerable> GenerateWriteValues(int count) { for (int i = 0; i < count; i++) { yield return new KeyValuePair( $"tag{i}", new TypedValue { DoubleValue = i * 1.0 }); await Task.Yield(); } } private static async IAsyncEnumerable GenerateAsyncSequence(int count) { for (int i = 0; i < count; i++) { yield return i; await Task.Yield(); } } }