using Grpc.Core;
namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
///
/// Thread-safe that records every written message
/// and lets a test await the first message or a specific message count with a timeout.
///
/// The streamed message type.
public sealed class RecordingServerStreamWriter : IServerStreamWriter
{
private readonly object _syncRoot = new();
private readonly TaskCompletionSource _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List _messages = [];
private readonly List>> _countWaiters = [];
/// Gets the messages written to this stream, in order.
public IReadOnlyList Messages
{
get
{
lock (_syncRoot)
{
return _messages.ToArray();
}
}
}
/// Gets or sets options for writing messages to the stream.
public WriteOptions? WriteOptions { get; set; }
/// Records the supplied message.
/// The message to record.
/// A completed task.
public Task WriteAsync(T message)
{
List>>? satisfied = null;
IReadOnlyList? snapshot = null;
lock (_syncRoot)
{
_messages.Add(message);
_firstMessage.TrySetResult(message);
// Check whether any count waiters are now satisfied.
if (_countWaiters.Count > 0)
{
snapshot = _messages.ToArray();
satisfied = _countWaiters.ToList();
_countWaiters.Clear();
}
}
if (satisfied is not null && snapshot is not null)
{
foreach (TaskCompletionSource> waiter in satisfied)
{
waiter.TrySetResult(snapshot);
}
}
return Task.CompletedTask;
}
/// Waits for the first message to be written within the specified timeout.
/// Maximum time to wait for the first message.
/// The first message written to this stream.
public async Task WaitForFirstMessageAsync(TimeSpan timeout) =>
await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
///
/// Waits until at least messages have been written, then returns
/// the current snapshot. The wait is bounded by a single deadline of now + timeout;
/// intermediate wakeups (when a message arrives but the count is not yet met) consume from
/// that same deadline so the total elapsed time never exceeds .
/// If fewer than messages arrive before the deadline the call
/// throws .
///
/// Minimum number of messages to wait for.
/// Maximum total time to wait, measured from the moment of the call.
/// A snapshot of all messages received so far (at least ).
public async Task> WaitForMessageCountAsync(int count, TimeSpan timeout)
{
// Capture a single deadline so every iteration of the loop below draws from the
// same budget — using WaitAsync(timeout) per iteration would reset the clock on
// each intermediate wakeup, effectively giving N×timeout total budget.
using CancellationTokenSource deadlineCts = new(timeout);
CancellationToken deadlineToken = deadlineCts.Token;
TaskCompletionSource>? tcs = null;
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
// Re-check each time any message arrives. The TCS is satisfied on every write,
// but the caller may need more messages, so we loop until the count is met.
while (true)
{
IReadOnlyList snapshot = await tcs.Task.WaitAsync(deadlineToken).ConfigureAwait(false);
if (snapshot.Count >= count)
{
return snapshot;
}
// Not enough yet — register a new waiter and keep waiting against the same deadline.
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
}
}
}