feat(adminui): add connection-health signal to in-process broadcaster + bridges

This commit is contained in:
Joseph Doherty
2026-06-11 09:20:36 -04:00
parent 565b77e6cf
commit 3a0e0907e4
4 changed files with 130 additions and 2 deletions
@@ -38,13 +38,22 @@ public sealed class AlertSignalRBridge : ReceiveActor
_hub = hub;
_broadcaster = broadcaster;
ReceiveAsync<AlarmTransitionEvent>(ForwardAsync);
Receive<SubscribeAck>(_ => { /* DPS confirmation */ });
// DPS subscription is now live — mark the feed connected so the Blazor "live" pill lights up.
Receive<SubscribeAck>(_ => _broadcaster.SetConnected(true));
}
/// <inheritdoc />
protected override void PreStart() =>
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(TopicName, Self));
/// <inheritdoc />
protected override void PostStop()
{
// Bridge stopping — the feed is no longer live, drop the "live" pill.
_broadcaster.SetConnected(false);
base.PostStop();
}
private async Task ForwardAsync(AlarmTransitionEvent msg)
{
// In-process fan-out first — this is what the Blazor Server Alerts page reads. The hub push
@@ -26,16 +26,78 @@ public interface IInProcessBroadcaster<T>
/// <summary>Fan the item out to all current <see cref="Received"/> subscribers.</summary>
void Publish(T item);
/// <summary>
/// Whether the upstream feed (the per-node SignalR bridge's DPS subscription) is currently
/// live. Drives the "live" pill on the Blazor pages. False until the bridge's first
/// <c>SubscribeAck</c>; flips false again when the bridge stops.
/// </summary>
bool IsConnected { get; }
/// <summary>
/// Raised whenever <see cref="IsConnected"/> changes (and only on change), with the new value.
/// Handlers run on the caller's thread (the bridge actor), so Blazor subscribers must marshal
/// via <c>InvokeAsync</c>.
/// </summary>
event Action<bool>? ConnectionStateChanged;
/// <summary>
/// Set by the bridge actor from its DPS-subscription health: <c>true</c> on <c>SubscribeAck</c>
/// (subscription live), <c>false</c> on <c>PostStop</c>/failure. Raises
/// <see cref="ConnectionStateChanged"/> only when the value actually changes.
/// </summary>
/// <param name="connected">The new connection state.</param>
void SetConnected(bool connected);
}
/// <summary>Thread-safe singleton implementation of <see cref="IInProcessBroadcaster{T}"/>.</summary>
/// <typeparam name="T">The event payload type.</typeparam>
public sealed class InProcessBroadcaster<T> : IInProcessBroadcaster<T>
{
// Guards _isConnected: the bridge actor sets it on the actor thread; Blazor reads it on the
// render thread, so access must be serialised.
private readonly object _connectionLock = new();
private bool _isConnected;
/// <inheritdoc />
public event Action<T>? Received;
/// <inheritdoc />
public event Action<bool>? ConnectionStateChanged;
/// <inheritdoc />
// Capture-then-invoke (via ?.) so a concurrent unsubscribe can't null the delegate mid-raise.
public void Publish(T item) => Received?.Invoke(item);
/// <inheritdoc />
public bool IsConnected
{
get
{
lock (_connectionLock)
{
return _isConnected;
}
}
}
/// <inheritdoc />
public void SetConnected(bool connected)
{
Action<bool>? handler;
lock (_connectionLock)
{
if (_isConnected == connected)
{
return;
}
_isConnected = connected;
// Capture inside the lock, invoke outside (mirrors Publish) so a concurrent
// unsubscribe can't null the delegate mid-raise and we never hold the lock during a callback.
handler = ConnectionStateChanged;
}
handler?.Invoke(connected);
}
}
@@ -32,13 +32,22 @@ public sealed class ScriptLogSignalRBridge : ReceiveActor
_hub = hub;
_broadcaster = broadcaster;
ReceiveAsync<ScriptLogEntry>(ForwardAsync);
Receive<SubscribeAck>(_ => { /* DPS confirmation */ });
// DPS subscription is now live — mark the feed connected so the Blazor "live" pill lights up.
Receive<SubscribeAck>(_ => _broadcaster.SetConnected(true));
}
/// <inheritdoc />
protected override void PreStart() =>
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(TopicName, Self));
/// <inheritdoc />
protected override void PostStop()
{
// Bridge stopping — the feed is no longer live, drop the "live" pill.
_broadcaster.SetConnected(false);
base.PostStop();
}
private async Task ForwardAsync(ScriptLogEntry msg)
{
// In-process fan-out first — this is what the Blazor Server Script log page reads. The hub
@@ -48,4 +48,52 @@ public sealed class InProcessBroadcasterTests
var broadcaster = new InProcessBroadcaster<int>();
Should.NotThrow(() => broadcaster.Publish(42));
}
[Fact]
public void New_broadcaster_is_not_connected()
{
var broadcaster = new InProcessBroadcaster<string>();
broadcaster.IsConnected.ShouldBeFalse();
}
[Fact]
public void SetConnected_true_flips_state_and_raises_once()
{
var broadcaster = new InProcessBroadcaster<string>();
var raised = new List<bool>();
broadcaster.ConnectionStateChanged += raised.Add;
broadcaster.SetConnected(true);
broadcaster.IsConnected.ShouldBeTrue();
raised.ShouldBe([true]);
}
[Fact]
public void SetConnected_same_value_does_not_raise()
{
var broadcaster = new InProcessBroadcaster<string>();
var raised = new List<bool>();
broadcaster.ConnectionStateChanged += raised.Add;
broadcaster.SetConnected(true);
broadcaster.SetConnected(true);
raised.ShouldBe([true]);
}
[Fact]
public void SetConnected_false_after_true_raises_false()
{
var broadcaster = new InProcessBroadcaster<string>();
var raised = new List<bool>();
broadcaster.ConnectionStateChanged += raised.Add;
broadcaster.SetConnected(true);
broadcaster.SetConnected(false);
broadcaster.IsConnected.ShouldBeFalse();
raised[^1].ShouldBeFalse();
}
}