using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
namespace ZB.MOM.WW.MxGateway.Server.Galaxy;
///
/// Channel-based fan-out of Galaxy deploy events to streaming gRPC subscribers. Each
/// subscriber gets a private bounded channel so a slow client cannot back-pressure
/// other subscribers or the publisher. When a subscriber's channel is full the oldest
/// event is dropped — clients use the sequence field to detect gaps.
///
///
/// Publishes Galaxy deploy events to streaming gRPC subscribers via private bounded channels.
///
public sealed class GalaxyDeployNotifier : IGalaxyDeployNotifier
{
private const int SubscriberQueueCapacity = 16;
private readonly ConcurrentDictionary> _subscribers = new();
private GalaxyDeployEventInfo? _latest;
///
/// The most recent deploy event, or null if none has been published.
///
public GalaxyDeployEventInfo? Latest => Volatile.Read(ref _latest);
///
public void Publish(GalaxyDeployEventInfo info)
{
ArgumentNullException.ThrowIfNull(info);
Volatile.Write(ref _latest, info);
foreach (Channel channel in _subscribers.Values)
{
// BoundedChannelFullMode.DropOldest -> writes never wait; we only fail if the
// channel was completed by the subscriber side, which we ignore.
channel.Writer.TryWrite(info);
}
}
///
public async IAsyncEnumerable SubscribeAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Guid subscriberId = Guid.NewGuid();
Channel channel = Channel.CreateBounded(
new BoundedChannelOptions(SubscriberQueueCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false,
});
_subscribers[subscriberId] = channel;
// Bootstrap: emit the latest known event so subscribers don't need to wait for
// the next deploy to know current state.
GalaxyDeployEventInfo? bootstrap = Volatile.Read(ref _latest);
if (bootstrap is not null)
{
channel.Writer.TryWrite(bootstrap);
}
try
{
while (await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (channel.Reader.TryRead(out GalaxyDeployEventInfo? next))
{
yield return next;
}
}
}
finally
{
_subscribers.TryRemove(subscriberId, out _);
channel.Writer.TryComplete();
}
}
}