Issue #14: implement event streaming and backpressure
This commit is contained in:
@@ -29,6 +29,7 @@ public sealed class WorkerClient : IWorkerClient
|
||||
private WorkerClientState _state;
|
||||
private DateTimeOffset _lastHeartbeatAt;
|
||||
private int? _processId;
|
||||
private int _eventQueueDepth;
|
||||
private Task? _readLoopTask;
|
||||
private Task? _writeLoopTask;
|
||||
private Task? _heartbeatLoopTask;
|
||||
@@ -197,6 +198,8 @@ public sealed class WorkerClient : IWorkerClient
|
||||
{
|
||||
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth));
|
||||
_metrics?.SetEventQueueDepth(queueDepth);
|
||||
yield return workerEvent;
|
||||
}
|
||||
}
|
||||
@@ -394,11 +397,6 @@ public sealed class WorkerClient : IWorkerClient
|
||||
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
|
||||
}
|
||||
|
||||
if (!await _events.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_events.Writer.TryWrite(workerEvent))
|
||||
{
|
||||
_metrics?.QueueOverflow("worker-events");
|
||||
@@ -406,7 +404,11 @@ public sealed class WorkerClient : IWorkerClient
|
||||
WorkerClientErrorCode.ProtocolViolation,
|
||||
"Worker event channel rejected an event.",
|
||||
null);
|
||||
return;
|
||||
}
|
||||
|
||||
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
|
||||
_metrics?.SetEventQueueDepth(queueDepth);
|
||||
}
|
||||
|
||||
private void CompleteCommand(WorkerEnvelope envelope)
|
||||
|
||||
Reference in New Issue
Block a user