refactor: rename ScadaLink → ZB.MOM.WW.ScadaBridge (code + projects + namespaces)

Solution + 23 src projects + 26 test projects renamed; folders, csproj,
namespaces, and ScadaLinkDbContext/ScadaBridgeDbContext class updated.
ActorSystem "scadalink" → "scadabridge", Akka seed-node URLs migrated.
SQL roles/logins, LDAP domains, CLI command name, and CLI config dir
(~/.scadalink → ~/.scadabridge) also renamed.

Build green; 5 Host.Tests fail awaiting SQL login rename in next commit.
Pre-existing StaleTagMonitor timing flakes unchanged.

Rename script committed at tools/rename-to-scadabridge.sh.
This commit is contained in:
Joseph Doherty
2026-05-28 09:37:45 -04:00
parent 6d87ee3c3b
commit 7b0b9c7365
1531 changed files with 11180 additions and 11054 deletions
@@ -0,0 +1,582 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Communication;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// <summary>
/// Abstraction for creating ClusterClient instances per site, enabling testability.
/// </summary>
public interface ISiteClientFactory
{
/// <summary>Creates a ClusterClient actor for the given site with the specified contact points.</summary>
/// <param name="system">The actor system in which to create the client.</param>
/// <param name="siteId">The site identifier, used to name the actor.</param>
/// <param name="contacts">The set of receptionist actor paths to use as initial contacts.</param>
/// <returns>An actor reference for the new ClusterClient.</returns>
IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts);
}
/// <summary>
/// Default implementation that creates a real ClusterClient for each site.
/// </summary>
public class DefaultSiteClientFactory : ISiteClientFactory
{
/// <inheritdoc />
public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts)
{
var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts);
return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}");
}
}
/// <summary>
/// Central-side actor that routes messages from central to site clusters via ClusterClient.
/// Resolves site addresses from the database on a periodic refresh cycle and manages
/// per-site ClusterClient instances.
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
/// </summary>
public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IServiceProvider _serviceProvider;
private readonly ISiteClientFactory _siteClientFactory;
/// <summary>
/// Per-site ClusterClient instances and their contact addresses.
/// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings).
/// Refreshed periodically via RefreshSiteAddresses.
/// </summary>
private readonly Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new();
// Communication-016: the previous _debugSubscriptions / _inProgressDeployments
// dictionaries existed solely to support a documented "synchronous kill streams +
// mark deployments failed on site disconnect" workflow triggered by
// ConnectionStateChanged. No production code ever emitted that message — only
// the unit test did — so the workflow was dead from end to end. Disconnect
// detection is owned by the underlying transports: the gRPC keepalive PING
// signals stream interruption in ~25s (handled by DebugStreamBridgeActor's own
// reconnection logic), and an Ask round-trip for a deploy times out at the
// CommunicationService layer (caller sees failure). The tracking dicts +
// ConnectionStateChanged record + HandleConnectionStateChanged handler are
// removed; see docs/requirements/Component-Communication.md "Connection
// Failure Behavior" for the keepalive-based contract that survives.
private ICancelable? _refreshSchedule;
/// <summary>
/// Communication-019: per-actor lifecycle CTS threaded into the periodic
/// <see cref="LoadSiteAddressesFromDb"/> repository call so a hung MS SQL
/// connection is bounded by actor shutdown rather than holding piped tasks
/// open indefinitely. Cancelled in <see cref="PostStop"/>; never reset.
/// </summary>
private readonly CancellationTokenSource _lifecycleCts = new();
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central NotificationOutboxActor cluster singleton.
/// Set via <see cref="RegisterNotificationOutbox"/> — the Host creates the singleton proxy
/// after this actor and registers it (mirrors how the site-side actor receives its
/// runtime <see cref="IActorRef"/>s). Null until registration completes; a notification
/// arriving before then is rejected with a non-accepted ack so the site retries.
/// </summary>
private IActorRef? _notificationOutboxProxy;
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central AuditLogIngestActor cluster
/// singleton. Set via <see cref="RegisterAuditIngest"/> — the Host creates the
/// singleton proxy after this actor and registers it (mirrors
/// <see cref="_notificationOutboxProxy"/>). Null until registration completes;
/// an audit ingest command arriving before then is answered with an empty
/// reply so the site keeps its rows Pending and retries.
///
/// Once registered, the handler Asks this proxy and pipes the reply straight
/// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a
/// <see cref="Status.Failure"/> to the caller — the fault propagates rather
/// than being swallowed. This differs from the gRPC handler
/// (<c>SiteStreamGrpcServer</c>), which catches the exception and returns an
/// empty ack; here the faulted Ask is the transient signal the site relies on
/// (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private IActorRef? _auditIngestProxy;
/// <summary>
/// Default Ask timeout for routing audit ingest commands to the
/// AuditLogIngestActor proxy — 30 s, matching the value of
/// <c>SiteStreamGrpcServer.AuditIngestAskTimeout</c> (that constant is private
/// to the gRPC server and not reachable here, so it is declared locally). A
/// generous window absorbs a slow MS SQL connection without the round-trip
/// surfacing as a failure on a healthy site. When the window is exceeded the
/// Ask faults and that fault is piped back to the caller as a
/// <see cref="Status.Failure"/> (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30);
/// <summary>
/// Effective Ask timeout for audit ingest routing. Defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/>; overridable via the constructor
/// so tests can exercise the timeout/fault path without waiting 30 s.
/// </summary>
private readonly TimeSpan _auditIngestAskTimeout;
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See
/// <see cref="SiteHealthReportReplica"/> for the protocol rationale.
/// </summary>
private const string HealthReportTopic = "site-health-replica";
/// <summary>Initializes the <see cref="CentralCommunicationActor"/> and wires all message handlers.</summary>
/// <param name="serviceProvider">DI service provider for scoped repository and aggregator access.</param>
/// <param name="siteClientFactory">Factory used to create per-site ClusterClient actors.</param>
/// <param name="auditIngestAskTimeout">
/// Optional override for the audit-ingest Ask timeout; defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/> (30 s). Exists only so tests can
/// exercise the timeout/fault path quickly — production always uses the default.
/// </param>
public CentralCommunicationActor(
IServiceProvider serviceProvider,
ISiteClientFactory siteClientFactory,
TimeSpan? auditIngestAskTimeout = null)
{
_serviceProvider = serviceProvider;
_siteClientFactory = siteClientFactory;
_auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout;
// Site address cache loaded from database
Receive<SiteAddressCacheLoaded>(HandleSiteAddressCacheLoaded);
// Periodic refresh trigger
Receive<RefreshSiteAddresses>(_ => LoadSiteAddressesFromDb());
// Communication-006: a faulted LoadSiteAddressesFromDb task is piped here as a
// Status.Failure. Without this handler the failure was an unhandled message
// (debug-level only) and the refresh failed silently — operators could not
// distinguish "no sites configured" from "database is down". Log at Warning.
Receive<Status.Failure>(failure =>
_log.Warning(failure.Cause,
"Failed to load site addresses from the database; the site ClusterClient "
+ "cache was not refreshed and may be stale or empty"));
// Health monitoring: heartbeats and health reports from sites
Receive<HeartbeatMessage>(HandleHeartbeat);
Receive<SiteHealthReport>(HandleSiteHealthReport);
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
// Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope);
// Notification Outbox: the Host registers the outbox singleton proxy after this
// actor is created (the proxy cannot exist before this actor's construction).
Receive<RegisterNotificationOutbox>(msg =>
{
_notificationOutboxProxy = msg.OutboxProxy;
_log.Info("Registered notification outbox proxy");
});
// Notification Outbox ingest: a site forwards a buffered NotificationSubmit to the
// central cluster via ClusterClient. Forward to the outbox proxy so the original
// Sender (the site's ClusterClient path) is preserved and the NotificationSubmitAck
// routes straight back to the site.
Receive<NotificationSubmit>(HandleNotificationSubmit);
// Notification Outbox status query: forward to the outbox proxy, preserving Sender
// so the NotificationStatusResponse routes back to the querying site.
Receive<NotificationStatusQuery>(HandleNotificationStatusQuery);
// Audit Log (#23): the Host registers the AuditLogIngestActor singleton
// proxy after this actor is created (the proxy cannot exist before this
// actor's construction).
Receive<RegisterAuditIngest>(msg =>
{
_auditIngestProxy = msg.AuditIngestActor;
_log.Info("Registered audit ingest proxy");
});
// Audit Log (#23) site→central ingest: a site forwards a batch of audit
// events to the central cluster via ClusterClient. Ask the ingest proxy
// and pipe the IngestAuditEventsReply back to the original Sender (the
// site's ClusterClient path) so the site can flip its rows to Forwarded.
Receive<IngestAuditEventsCommand>(HandleIngestAuditEvents);
// Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy
// the same way; the proxy replies with an IngestCachedTelemetryReply.
Receive<IngestCachedTelemetryCommand>(HandleIngestCachedTelemetry);
}
private void HandleNotificationSubmit(NotificationSubmit msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. A non-accepted ack makes the site's
// Store-and-Forward forwarder treat this as transient and retry later.
_log.Warning(
"Cannot route NotificationSubmit {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationSubmitAck(
msg.NotificationId, Accepted: false, Error: "notification outbox not available"));
return;
}
_log.Debug("Routing NotificationSubmit {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
}
private void HandleNotificationStatusQuery(NotificationStatusQuery msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. Reply Found: false so the querying site
// falls back to its local Store-and-Forward buffer to resolve the status.
_log.Warning(
"Cannot route NotificationStatusQuery {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationStatusResponse(
msg.CorrelationId, Found: false, Status: "Unknown",
RetryCount: 0, LastError: null, DeliveredAt: null));
return;
}
_log.Debug("Routing NotificationStatusQuery {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
}
private void HandleIngestAuditEvents(IngestAuditEventsCommand msg)
{
if (_auditIngestProxy == null)
{
// No ingest proxy registered yet (host startup race). Reply with an
// empty IngestAuditEventsReply so the site keeps its rows Pending and
// retries — the same behaviour as the gRPC handler's wiring-race path.
_log.Warning(
"Cannot route IngestAuditEventsCommand ({0} events) — audit ingest not available",
msg.Events.Count);
Sender.Tell(new IngestAuditEventsReply(Array.Empty<Guid>()));
return;
}
// Capture Sender before the async/PipeTo — Akka resets Sender between
// dispatches. The reply is piped straight back to the site's ClusterClient.
// On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to
// replyTo: the fault propagates to the caller rather than being swallowed.
// The site's own Ask through this path then faults, and the site drain loop
// treats that as a transient failure — rows stay Pending and are retried on
// the next tick. (The gRPC handler instead returns an empty ack on fault;
// propagating the fault here is the cleaner transient signal.)
var replyTo = Sender;
_log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count);
_auditIngestProxy.Ask<IngestAuditEventsReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleIngestCachedTelemetry(IngestCachedTelemetryCommand msg)
{
if (_auditIngestProxy == null)
{
_log.Warning(
"Cannot route IngestCachedTelemetryCommand ({0} entries) — audit ingest not available",
msg.Entries.Count);
Sender.Tell(new IngestCachedTelemetryReply(Array.Empty<Guid>()));
return;
}
var replyTo = Sender;
_log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count);
_auditIngestProxy.Ask<IngestCachedTelemetryReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleHeartbeat(HeartbeatMessage heartbeat)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
aggregator?.MarkHeartbeat(heartbeat.SiteId, heartbeat.Timestamp);
}
/// <summary>
/// Handles a report delivered directly from a site (via ClusterClient):
/// process locally, then fan out to the peer central node so its
/// aggregator stays in sync.
/// </summary>
private void HandleSiteHealthReport(SiteHealthReport report)
{
ProcessLocally(report);
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Publish(HealthReportTopic, new SiteHealthReportReplica(report)));
}
catch
{
// No-op in non-clustered hosts (TestKit).
}
}
/// <summary>
/// Applies a report to the local aggregator without re-broadcasting.
/// Used for both site-originated reports and peer-replicated ones — the
/// aggregator is idempotent via sequence-number comparison.
/// </summary>
private void ProcessLocally(SiteHealthReport report)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
if (aggregator != null)
{
aggregator.ProcessReport(report);
}
else
{
_log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId);
}
}
// Communication-016: HandleConnectionStateChanged removed — no production
// caller emitted ConnectionStateChanged, so the workflow ran only in tests.
// Disconnect detection is owned by the transport layers (gRPC keepalive +
// ClusterClient/Ask timeout).
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
// Route via ClusterClient — Sender is preserved for Ask response routing
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
private void LoadSiteAddressesFromDb()
{
var self = Self;
// Communication-019: pass the actor's lifecycle CT into the repository
// call so a hung database query is cancelled when the actor stops
// rather than leaving the piped task to accumulate. Captured locally
// because the lifecycle CTS may have been disposed by PostStop on a
// racing late tick; treat that as "actor gone, give up".
CancellationToken ct;
try
{
ct = _lifecycleCts.Token;
}
catch (ObjectDisposedException)
{
return;
}
Task.Run(async () =>
{
using var scope = _serviceProvider.CreateScope();
var repo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var sites = await repo.GetAllSitesAsync(ct).ConfigureAwait(false);
var contacts = new Dictionary<string, List<string>>();
foreach (var site in sites)
{
var addrs = new List<string>();
if (!string.IsNullOrWhiteSpace(site.NodeAAddress))
{
var addr = site.NodeAAddress;
// Strip actor path suffix if present (legacy format)
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (!string.IsNullOrWhiteSpace(site.NodeBAddress))
{
var addr = site.NodeBAddress;
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (addrs.Count > 0)
contacts[site.SiteIdentifier] = addrs;
}
// Communication-020: freeze the cross-task payload before piping to
// Self. The message record exposes read-only types (
// IReadOnlyDictionary / IReadOnlyList) so the Akka.NET message-
// immutability convention is enforced by type, not just convention.
var frozen = contacts.ToDictionary(
kvp => kvp.Key,
kvp => (IReadOnlyList<string>)kvp.Value.AsReadOnly());
return new SiteAddressCacheLoaded(frozen);
}).PipeTo(self);
}
private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg)
{
var newSiteIds = msg.SiteContacts.Keys.ToHashSet();
var existingSiteIds = _siteClients.Keys.ToHashSet();
// Stop ClusterClients for removed sites
foreach (var removed in existingSiteIds.Except(newSiteIds))
{
_log.Info("Stopping ClusterClient for removed site {0}", removed);
Context.Stop(_siteClients[removed].Client);
_siteClients.Remove(removed);
}
// Add or update
foreach (var (siteId, addresses) in msg.SiteContacts)
{
// Communication-009: parse all addresses up front inside a try/catch so a
// single malformed site row cannot abort the whole refresh loop and leave
// the cache half-updated. A bad site is logged and skipped; others proceed.
ImmutableHashSet<ActorPath> contactPaths;
try
{
contactPaths = addresses
.Select(a => ActorPath.Parse($"{a}/system/receptionist"))
.ToImmutableHashSet();
}
catch (Exception ex)
{
_log.Warning(ex,
"Malformed contact address for site {0}; skipping this site in the refresh "
+ "(other sites are unaffected)", siteId);
continue;
}
var contactStrings = addresses.ToImmutableHashSet();
// Skip if unchanged
if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings))
continue;
// Stop old client if addresses changed
if (_siteClients.ContainsKey(siteId))
{
_log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId);
Context.Stop(_siteClients[siteId].Client);
}
var client = _siteClientFactory.Create(Context.System, siteId, contactPaths);
_siteClients[siteId] = (client, contactStrings);
_log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count);
}
_log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count);
}
// Communication-016: TrackMessageForCleanup removed — the dicts it fed
// existed solely to support the dead ConnectionStateChanged workflow.
/// <inheritdoc />
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1,
withinTimeRange: Timeout.InfiniteTimeSpan,
decider: Decider.From(ex =>
{
_log.Warning(ex, "Child actor of CentralCommunicationActor faulted, resuming (state preserved)");
return Directive.Resume;
}));
}
/// <inheritdoc />
protected override void PreStart()
{
_log.Info("CentralCommunicationActor started");
// Subscribe to the peer-replication topic so we receive health reports
// delivered to the other central node and keep our local aggregator
// in sync (ClusterClient load-balances reports across nodes).
// Tolerant of non-clustered hosts (TestKit) where the extension is absent.
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(HealthReportTopic, Self));
}
catch (Exception ex)
{
_log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message);
}
// Schedule periodic refresh of site addresses from the database
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
TimeSpan.Zero,
TimeSpan.FromSeconds(60),
Self,
new RefreshSiteAddresses(),
ActorRefs.NoSender);
}
/// <inheritdoc />
protected override void PostStop()
{
_log.Info("CentralCommunicationActor stopped");
_refreshSchedule?.Cancel();
// Communication-019: cancel any in-flight LoadSiteAddressesFromDb so a
// hung MS SQL query does not outlive the actor.
try
{
_lifecycleCts.Cancel();
}
catch (ObjectDisposedException)
{
// Double-stop is benign.
}
_lifecycleCts.Dispose();
}
}
/// <summary>
/// Command to trigger a refresh of site addresses from the database.
/// </summary>
public record RefreshSiteAddresses;
/// <summary>
/// Internal message carrying the loaded site contact data from the database.
/// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded.
///
/// Communication-020: the payload is exposed as <see cref="IReadOnlyDictionary{TKey,TValue}"/>
/// of <see cref="IReadOnlyList{T}"/> so the Akka.NET "messages are immutable"
/// convention is enforced at the type level rather than relying on producer
/// discipline. The producer wraps the constructed buckets with
/// <c>List&lt;T&gt;.AsReadOnly()</c> before piping to Self.
/// </summary>
internal record SiteAddressCacheLoaded(IReadOnlyDictionary<string, IReadOnlyList<string>> SiteContacts);
/// <summary>
/// Notification sent to debug view subscribers when the stream is terminated
/// due to site disconnection (WP-5).
/// </summary>
public record DebugStreamTerminated(string SiteId, string CorrelationId);
/// <summary>
/// Registers the central NotificationOutboxActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded <see cref="NotificationSubmit"/>
/// and <see cref="NotificationStatusQuery"/> messages can be routed to it. Sent by the Host
/// after the outbox singleton proxy is created.
/// </summary>
public record RegisterNotificationOutbox(IActorRef OutboxProxy);
/// <summary>
/// Registers the central AuditLogIngestActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded
/// <see cref="IngestAuditEventsCommand"/> and <see cref="IngestCachedTelemetryCommand"/>
/// messages can be routed to it. Sent by the Host after the audit-ingest
/// singleton proxy is created. Lives here (not in Commons) because
/// <c>ZB.MOM.WW.ScadaBridge.Commons</c> has no Akka package reference and cannot hold an
/// <see cref="IActorRef"/> field.
/// </summary>
public sealed record RegisterAuditIngest(IActorRef AuditIngestActor);
@@ -0,0 +1,291 @@
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// <summary>
/// Long-lived (one per active debug session) actor on the central side. Debug sessions
/// are session-based and temporary — this actor holds no persisted state and does not
/// derive from an Akka.Persistence base class; its state does not survive a restart.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
/// </summary>
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteIdentifier;
private readonly string _instanceUniqueName;
private readonly string _correlationId;
private readonly IActorRef _centralCommunicationActor;
private readonly Action<object> _onEvent;
private readonly Action _onTerminated;
private readonly SiteStreamGrpcClientFactory _grpcFactory;
private readonly string _grpcNodeAAddress;
private readonly string _grpcNodeBAddress;
private const int MaxRetries = 3;
private const string ReconnectTimerKey = "grpc-reconnect";
private const string StabilityTimerKey = "grpc-stability";
/// <summary>Delay between gRPC reconnection attempts.</summary>
internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// How long a freshly-opened gRPC stream must stay up before its retry budget
/// is considered "recovered" and <see cref="_retryCount"/> is reset to 0.
/// Communication-008: the retry count must NOT be reset by individual events —
/// a stream that connects, delivers one event, then fails repeatedly would
/// otherwise reconnect forever and never trip <see cref="MaxRetries"/>. Resetting
/// only after a stable interval bounds a flapping stream.
/// </summary>
internal static TimeSpan StabilityWindow { get; set; } = TimeSpan.FromSeconds(60);
private int _retryCount;
private bool _useNodeA = true;
private bool _stopped;
private CancellationTokenSource? _grpcCts;
/// <summary>Timer scheduler for reconnect and stability window timers.</summary>
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>
/// Initializes the debug stream bridge actor and registers message handlers.
/// </summary>
/// <param name="siteIdentifier">Site identifier for targeting ClusterClient messages and logging.</param>
/// <param name="instanceUniqueName">Unique name of the instance whose debug stream is being bridged.</param>
/// <param name="correlationId">Correlation id for the debug session.</param>
/// <param name="centralCommunicationActor">Actor used to forward ClusterClient messages to the site.</param>
/// <param name="onEvent">Callback invoked on each received debug event.</param>
/// <param name="onTerminated">Callback invoked when the stream terminates.</param>
/// <param name="grpcFactory">Factory for creating gRPC streaming clients.</param>
/// <param name="grpcNodeAAddress">gRPC address of the site's node A.</param>
/// <param name="grpcNodeBAddress">gRPC address of the site's node B.</param>
public DebugStreamBridgeActor(
string siteIdentifier,
string instanceUniqueName,
string correlationId,
IActorRef centralCommunicationActor,
Action<object> onEvent,
Action onTerminated,
SiteStreamGrpcClientFactory grpcFactory,
string grpcNodeAAddress,
string grpcNodeBAddress)
{
_siteIdentifier = siteIdentifier;
_instanceUniqueName = instanceUniqueName;
_correlationId = correlationId;
_centralCommunicationActor = centralCommunicationActor;
_onEvent = onEvent;
_onTerminated = onTerminated;
_grpcFactory = grpcFactory;
_grpcNodeAAddress = grpcNodeAAddress;
_grpcNodeBAddress = grpcNodeBAddress;
// Initial snapshot response from the site (via ClusterClient)
Receive<DebugViewSnapshot>(snapshot =>
{
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
_onEvent(snapshot);
OpenGrpcStream();
});
// Domain events arriving via Self.Tell from gRPC callback.
// Communication-008: receiving an event must NOT reset _retryCount — a
// flapping stream that delivers a single event between failures would
// otherwise never trip MaxRetries. The retry budget is recovered only by
// GrpcStreamStable (a stream that has stayed up for StabilityWindow).
Receive<AttributeValueChanged>(changed => _onEvent(changed));
Receive<AlarmStateChanged>(changed => _onEvent(changed));
// Stream has been stably connected for StabilityWindow — recover the
// retry budget so a future transient fault gets a fresh set of retries.
Receive<GrpcStreamStable>(_ =>
{
if (_stopped) return;
_retryCount = 0;
_log.Debug("gRPC stream for {0} stable, retry count reset", _instanceUniqueName);
});
// gRPC stream error — attempt reconnection
Receive<GrpcStreamError>(msg =>
{
_log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message);
HandleGrpcError();
});
// Scheduled reconnection
Receive<ReconnectGrpcStream>(_ => OpenGrpcStream());
// Consumer requests stop
Receive<StopDebugStream>(_ =>
{
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
Context.Stop(Self);
});
// Site disconnected — CentralCommunicationActor notifies us
Receive<DebugStreamTerminated>(msg =>
{
if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
// Orphan safety net — if nobody stops us within 5 minutes, self-terminate
Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
Receive<ReceiveTimeout>(_ =>
{
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
}
/// <inheritdoc />
protected override void PreStart()
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
/// <inheritdoc />
protected override void PostStop()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
base.PostStop();
}
private void OpenGrpcStream()
{
if (_stopped) return;
var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress;
_log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint);
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = new CancellationTokenSource();
// Arm the stability timer: if the stream stays up for StabilityWindow the
// retry budget is recovered (Communication-008). Cancelled by HandleGrpcError.
Timers.StartSingleTimer(StabilityTimerKey, new GrpcStreamStable(), StabilityWindow);
var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint);
var self = Self;
var ct = _grpcCts.Token;
// Launch as background task — onEvent and onError marshal back to actor via Tell
Task.Run(async () =>
{
await client.SubscribeAsync(
_correlationId,
_instanceUniqueName,
evt => self.Tell(evt),
ex => self.Tell(new GrpcStreamError(ex)),
ct);
}, ct);
}
private void HandleGrpcError()
{
if (_stopped) return;
// The stream failed before reaching the stability window — its retry
// budget is NOT recovered (Communication-008).
Timers.Cancel(StabilityTimerKey);
_retryCount++;
if (_retryCount > MaxRetries)
{
_log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
return;
}
// Unsubscribe the failed stream on the *previous* endpoint before reconnecting.
// This cancels the local subscription CTS and -- where the channel is still
// alive -- propagates gRPC cancellation to the site so its SiteStreamGrpcServer
// stops the StreamRelayActor for this correlation ID, rather than leaving a
// zombie relay actor until TCP RST / keepalive eventually detects the loss.
var previousEndpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress;
var previousClient = _grpcFactory.GetOrCreate(_siteIdentifier, previousEndpoint);
previousClient.Unsubscribe(_correlationId);
// Flip to the other node
_useNodeA = !_useNodeA;
// First retry is immediate, subsequent retries use a short backoff
if (_retryCount == 1)
{
Self.Tell(new ReconnectGrpcStream());
}
else
{
Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay);
}
}
private void CleanupGrpc()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
var client = _grpcFactory.GetOrCreate(_siteIdentifier,
_useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress);
client.Unsubscribe(_correlationId);
}
private void SendUnsubscribe()
{
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
}
/// <summary>
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
/// </summary>
public record StopDebugStream;
/// <summary>
/// Internal message indicating a gRPC stream error occurred.
/// </summary>
internal record GrpcStreamError(Exception Exception);
/// <summary>
/// Internal message to trigger gRPC stream reconnection.
/// </summary>
internal record ReconnectGrpcStream;
/// <summary>
/// Internal message indicating the current gRPC stream has been connected long
/// enough (<see cref="DebugStreamBridgeActor.StabilityWindow"/>) to be considered
/// stable, so the reconnect retry budget can be recovered.
/// </summary>
internal record GrpcStreamStable;
@@ -0,0 +1,457 @@
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Artifacts;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.InboundApi;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery;
namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// <summary>
/// Site-side actor that receives messages from central via ClusterClient and routes
/// them to the appropriate local actors. Also sends heartbeats and health reports
/// to central via the registered ClusterClient.
///
/// WP-4: Routes all 8 message patterns to local handlers.
/// </summary>
public class SiteCommunicationActor : ReceiveActor, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteId;
private readonly CommunicationOptions _options;
/// <summary>
/// Communication-018: predicate that returns <c>true</c> when this node is
/// the active member of the local site cluster (used to stamp
/// <see cref="HeartbeatMessage.IsActive"/>). Production builds default to
/// the Akka <see cref="Cluster"/> leader check; tests inject a stub so they
/// do not need a real cluster.
/// </summary>
private readonly Func<bool> _isActiveCheck;
/// <summary>
/// Reference to the local Deployment Manager singleton proxy.
/// </summary>
private readonly IActorRef _deploymentManagerProxy;
/// <summary>
/// ClusterClient reference for sending messages to the central cluster.
/// Set via RegisterCentralClient message.
/// </summary>
private IActorRef? _centralClient;
/// <summary>
/// Local actor references for routing specific message patterns.
/// Populated via registration messages.
/// </summary>
private IActorRef? _eventLogHandler;
private IActorRef? _parkedMessageHandler;
private IActorRef? _integrationHandler;
private IActorRef? _artifactHandler;
/// <summary>Akka timer scheduler injected by the framework via <see cref="IWithTimers"/>.</summary>
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>Initializes the actor, wires all message pattern handlers, and schedules the periodic heartbeat.</summary>
/// <param name="siteId">The site identifier included in outbound messages.</param>
/// <param name="options">Communication options including heartbeat interval and transport settings.</param>
/// <param name="deploymentManagerProxy">Local reference to the Deployment Manager singleton proxy.</param>
/// <param name="isActiveCheck">
/// Communication-018: optional override returning <c>true</c> when this node
/// is the active member of the site cluster. <c>null</c> uses the real
/// Akka <see cref="Cluster"/> leader check (the default for production
/// wiring); tests pass a stub so they do not need to load Akka.Cluster
/// into the <c>TestKit</c> ActorSystem.
/// </param>
public SiteCommunicationActor(
string siteId,
CommunicationOptions options,
IActorRef deploymentManagerProxy,
Func<bool>? isActiveCheck = null)
{
_siteId = siteId;
_options = options;
_deploymentManagerProxy = deploymentManagerProxy;
_isActiveCheck = isActiveCheck ?? DefaultIsActiveCheck;
// Registration
Receive<RegisterCentralClient>(msg =>
{
_centralClient = msg.Client;
_log.Info("Registered central ClusterClient");
});
Receive<RegisterLocalHandler>(HandleRegisterLocalHandler);
// Pattern 1: Instance Deployment — forward to Deployment Manager
Receive<DeployInstanceCommand>(msg =>
{
_log.Debug("Routing DeployInstanceCommand for {0} to DeploymentManager", msg.InstanceUniqueName);
_deploymentManagerProxy.Forward(msg);
});
// Pattern 2: Lifecycle — forward to Deployment Manager
Receive<DisableInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
Receive<EnableInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
Receive<DeleteInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
// DeploymentManager-006: query-the-site-before-redeploy — forward to
// the Deployment Manager, which owns the deployed-config store and
// answers with the instance's currently-applied deployment identity.
Receive<DeploymentStateQueryRequest>(msg => _deploymentManagerProxy.Forward(msg));
// Pattern 3: Artifact Deployment — forward to artifact handler if registered
Receive<DeployArtifactsCommand>(msg =>
{
if (_artifactHandler != null)
_artifactHandler.Forward(msg);
else
{
_log.Warning("No artifact handler registered, replying with failure");
Sender.Tell(new ArtifactDeploymentResponse(
msg.DeploymentId, _siteId, false, "Artifact handler not available", DateTimeOffset.UtcNow));
}
});
// Pattern 4: Integration Routing — forward to integration handler
Receive<IntegrationCallRequest>(msg =>
{
if (_integrationHandler != null)
_integrationHandler.Forward(msg);
else
{
Sender.Tell(new IntegrationCallResponse(
msg.CorrelationId, _siteId, false, null, "Integration handler not available", DateTimeOffset.UtcNow));
}
});
// Pattern 5: Debug View — forward to Deployment Manager (which routes to Instance Actor)
Receive<SubscribeDebugViewRequest>(msg => _deploymentManagerProxy.Forward(msg));
Receive<UnsubscribeDebugViewRequest>(msg => _deploymentManagerProxy.Forward(msg));
// Pattern 6a: Debug Snapshot (one-shot) — forward to Deployment Manager
Receive<DebugSnapshotRequest>(msg => _deploymentManagerProxy.Forward(msg));
// Inbound API Route.To() — forward to Deployment Manager for instance routing
Receive<RouteToCallRequest>(msg => _deploymentManagerProxy.Forward(msg));
Receive<RouteToGetAttributesRequest>(msg => _deploymentManagerProxy.Forward(msg));
Receive<RouteToSetAttributesRequest>(msg => _deploymentManagerProxy.Forward(msg));
// Pattern 7: Remote Queries
Receive<EventLogQueryRequest>(msg =>
{
if (_eventLogHandler != null)
_eventLogHandler.Forward(msg);
else
{
Sender.Tell(new EventLogQueryResponse(
msg.CorrelationId, _siteId, [], null, false, false,
"Event log handler not available", DateTimeOffset.UtcNow));
}
});
Receive<ParkedMessageQueryRequest>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedMessageQueryResponse(
msg.CorrelationId, _siteId, [], 0, msg.PageNumber, msg.PageSize, false,
"Parked message handler not available", DateTimeOffset.UtcNow));
}
});
Receive<ParkedMessageRetryRequest>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedMessageRetryResponse(
msg.CorrelationId, false, "Parked message handler not available"));
}
});
Receive<ParkedMessageDiscardRequest>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedMessageDiscardResponse(
msg.CorrelationId, false, "Parked message handler not available"));
}
});
// Task 5 (#22): central→site Retry/Discard relay for parked cached
// operations. SiteCallAuditActor relays these over the command/control
// channel; the parked-message handler executes them against the local
// S&F buffer and replies a ParkedOperationActionAck that routes back to
// the relaying SiteCallAuditActor's Ask.
Receive<RetryParkedOperation>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedOperationActionAck(
msg.CorrelationId, Applied: false, "Parked message handler not available"));
}
});
Receive<DiscardParkedOperation>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedOperationActionAck(
msg.CorrelationId, Applied: false, "Parked message handler not available"));
}
});
// Notification Outbox: forward a buffered notification submitted by the site
// Store-and-Forward Engine to the central cluster. The original Sender (the
// S&F forwarder's Ask) is forwarded as the ClusterClient.Send sender so the
// NotificationSubmitAck routes straight back to the waiting Ask, not here.
Receive<NotificationSubmit>(msg =>
{
if (_centralClient == null)
{
// No ClusterClient registered yet (e.g. central contact points not
// configured, or registration not yet completed). A non-accepted ack
// makes the S&F forwarder treat this as transient and retry later.
_log.Warning(
"Cannot forward NotificationSubmit {0} — no central ClusterClient registered",
msg.NotificationId);
Sender.Tell(new NotificationSubmitAck(
msg.NotificationId, Accepted: false, Error: "Central ClusterClient not registered"));
return;
}
_log.Debug("Forwarding NotificationSubmit {0} to central", msg.NotificationId);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
});
// Notification Outbox: forward a Notify.Status query to the central cluster.
// The original Sender (the Notify helper's Ask) is forwarded as the
// ClusterClient.Send sender so the NotificationStatusResponse routes straight
// back to the waiting Ask, not here.
Receive<NotificationStatusQuery>(msg =>
{
if (_centralClient == null)
{
// No ClusterClient registered yet. Reply Found: false so Notify.Status
// falls back to the site S&F buffer to decide Forwarding vs Unknown.
_log.Warning(
"Cannot forward NotificationStatusQuery {0} — no central ClusterClient registered",
msg.NotificationId);
Sender.Tell(new NotificationStatusResponse(
msg.CorrelationId, Found: false, Status: "Unknown",
RetryCount: 0, LastError: null, DeliveredAt: null));
return;
}
_log.Debug("Forwarding NotificationStatusQuery {0} to central", msg.NotificationId);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
});
// Audit Log (#23): forward a batch of site-local audit events to the
// central cluster. The site SiteAuditTelemetryActor drains its SQLite
// Pending queue through the ClusterClientSiteAuditClient, which Asks
// this actor; the original Sender (that Ask) is passed as the
// ClusterClient.Send sender so the IngestAuditEventsReply routes
// straight back to the waiting Ask, not here. Mirrors NotificationSubmit.
Receive<IngestAuditEventsCommand>(msg =>
{
if (_centralClient == null)
{
// No ClusterClient registered yet (e.g. central contact points
// not configured, or registration not yet completed). Faulting
// the Ask makes the SiteAuditTelemetryActor drain loop treat
// this as transient and keep the rows Pending for the next tick.
_log.Warning(
"Cannot forward IngestAuditEventsCommand ({0} events) — no central ClusterClient registered",
msg.Events.Count);
Sender.Tell(new Status.Failure(
new InvalidOperationException("Central ClusterClient not registered")));
return;
}
_log.Debug("Forwarding IngestAuditEventsCommand ({0} events) to central", msg.Events.Count);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
});
// Audit Log (#23) M3: forward a batch of combined cached-call telemetry
// packets to the central cluster. Same forward + reply-routing pattern
// as IngestAuditEventsCommand; central replies with an
// IngestCachedTelemetryReply.
Receive<IngestCachedTelemetryCommand>(msg =>
{
if (_centralClient == null)
{
_log.Warning(
"Cannot forward IngestCachedTelemetryCommand ({0} entries) — no central ClusterClient registered",
msg.Entries.Count);
Sender.Tell(new Status.Failure(
new InvalidOperationException("Central ClusterClient not registered")));
return;
}
_log.Debug("Forwarding IngestCachedTelemetryCommand ({0} entries) to central", msg.Entries.Count);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
});
// Internal: send heartbeat tick
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());
// Internal: forward health report to central
Receive<SiteHealthReport>(msg =>
{
_centralClient?.Tell(
new ClusterClient.Send("/user/central-communication", msg), Self);
});
}
/// <inheritdoc />
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1,
withinTimeRange: Timeout.InfiniteTimeSpan,
decider: Decider.From(ex =>
{
_log.Warning(ex, "Child actor of SiteCommunicationActor faulted, resuming (state preserved)");
return Directive.Resume;
}));
}
/// <inheritdoc />
protected override void PreStart()
{
_log.Info("SiteCommunicationActor started for site {0}", _siteId);
// Schedule periodic heartbeat to central
Timers.StartPeriodicTimer(
"heartbeat",
new SendHeartbeat(),
TimeSpan.FromSeconds(1), // initial delay
_options.TransportHeartbeatInterval);
}
private void HandleRegisterLocalHandler(RegisterLocalHandler msg)
{
switch (msg.HandlerType)
{
case LocalHandlerType.EventLog:
_eventLogHandler = msg.Handler;
break;
case LocalHandlerType.ParkedMessages:
_parkedMessageHandler = msg.Handler;
break;
case LocalHandlerType.Integration:
_integrationHandler = msg.Handler;
break;
case LocalHandlerType.Artifacts:
_artifactHandler = msg.Handler;
break;
}
_log.Info("Registered local handler for {0}", msg.HandlerType);
}
private void SendHeartbeatToCentral()
{
if (_centralClient == null)
return;
var hostname = Environment.MachineName;
// Communication-018: stamp HeartbeatMessage.IsActive with this node's
// true active/standby role rather than hard-coding `true`. The field is
// part of the wire contract (additive-only-evolution) so a future
// central health dashboard can distinguish "active node down, standby
// up" from "site fully offline" without a new message type.
bool isActive;
try
{
isActive = _isActiveCheck();
}
catch (Exception ex)
{
// Defensive: never let a cluster-state read failure abort the
// heartbeat itself (heartbeats are health signal — their absence is
// already meaningful). Fall back to the safest non-claiming value:
// standby. Logged at Debug because this path normally only fires
// during ActorSystem warm-up.
_log.Debug(ex,
"Active-node check threw while sending heartbeat for site {0}; reporting IsActive=false",
_siteId);
isActive = false;
}
var heartbeat = new HeartbeatMessage(
_siteId,
hostname,
IsActive: isActive,
DateTimeOffset.UtcNow);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", heartbeat), Self);
}
/// <summary>
/// Communication-018: default active-node check used when no override is
/// supplied. Mirrors <c>ActiveNodeGate</c> in the Host (and
/// <c>ActiveNodeHealthCheck</c>): the node is the active member of the
/// site cluster when it is the current cluster leader AND its own
/// <see cref="MemberStatus"/> is <see cref="MemberStatus.Up"/>. Any other
/// state (still joining, leaving, no leader yet) reports standby —
/// safe-by-default, matching the standby case.
/// </summary>
private bool DefaultIsActiveCheck()
{
var cluster = Cluster.Get(Context.System);
var self = cluster.SelfMember;
if (self.Status != MemberStatus.Up)
return false;
var leader = cluster.State.Leader;
return leader != null && leader == self.Address;
}
// ── Internal messages ──
internal record SendHeartbeat;
}
/// <summary>
/// Command to register a ClusterClient for communicating with the central cluster.
/// </summary>
public record RegisterCentralClient(IActorRef Client);
/// <summary>
/// Command to register a local actor as a handler for a specific message pattern.
/// </summary>
public record RegisterLocalHandler(LocalHandlerType HandlerType, IActorRef Handler);
public enum LocalHandlerType
{
EventLog,
ParkedMessages,
Integration,
Artifacts
}
@@ -0,0 +1,108 @@
using System.Threading.Channels;
using Akka.Actor;
using Akka.Event;
using Google.Protobuf.WellKnownTypes;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
using AlarmState = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState;
using AlarmLevel = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmLevel;
namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// <summary>
/// Lightweight relay actor that bridges Akka domain events (AttributeValueChanged,
/// AlarmStateChanged) to a System.Threading.Channels.Channel of protobuf SiteStreamEvent
/// messages. The gRPC server method reads from the channel's reader side.
/// </summary>
public class StreamRelayActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _correlationId;
private readonly ChannelWriter<SiteStreamEvent> _channelWriter;
/// <summary>
/// Initializes a new <see cref="StreamRelayActor"/> for the given gRPC stream correlation.
/// </summary>
/// <param name="correlationId">Correlation id stamped on every relayed <see cref="SiteStreamEvent"/>.</param>
/// <param name="channelWriter">Channel writer to which converted events are written.</param>
public StreamRelayActor(string correlationId, ChannelWriter<SiteStreamEvent> channelWriter)
{
_correlationId = correlationId;
_channelWriter = channelWriter;
Receive<AttributeValueChanged>(HandleAttributeValueChanged);
Receive<AlarmStateChanged>(HandleAlarmStateChanged);
}
private void HandleAttributeValueChanged(AttributeValueChanged msg)
{
var protoEvent = new SiteStreamEvent
{
CorrelationId = _correlationId,
AttributeChanged = new AttributeValueUpdate
{
InstanceUniqueName = msg.InstanceUniqueName,
AttributePath = msg.AttributePath,
AttributeName = msg.AttributeName,
Value = ValueFormatter.FormatDisplayValue(msg.Value),
Quality = MapQuality(msg.Quality),
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
}
};
WriteToChannel(protoEvent);
}
private void HandleAlarmStateChanged(AlarmStateChanged msg)
{
var protoEvent = new SiteStreamEvent
{
CorrelationId = _correlationId,
AlarmChanged = new AlarmStateUpdate
{
InstanceUniqueName = msg.InstanceUniqueName,
AlarmName = msg.AlarmName,
State = MapAlarmState(msg.State),
Priority = msg.Priority,
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp),
Level = MapAlarmLevel(msg.Level),
Message = msg.Message ?? string.Empty
}
};
WriteToChannel(protoEvent);
}
private void WriteToChannel(SiteStreamEvent protoEvent)
{
if (!_channelWriter.TryWrite(protoEvent))
{
_log.Warning("Channel full, dropping event for correlation {0}", _correlationId);
}
}
private static Quality MapQuality(string quality) => quality switch
{
"Good" => Quality.Good,
"Uncertain" => Quality.Uncertain,
"Bad" => Quality.Bad,
_ => Quality.Unspecified
};
private static AlarmStateEnum MapAlarmState(AlarmState state) => state switch
{
AlarmState.Normal => AlarmStateEnum.AlarmStateNormal,
AlarmState.Active => AlarmStateEnum.AlarmStateActive,
_ => AlarmStateEnum.AlarmStateUnspecified
};
private static AlarmLevelEnum MapAlarmLevel(AlarmLevel level) => level switch
{
AlarmLevel.Low => AlarmLevelEnum.AlarmLevelLow,
AlarmLevel.LowLow => AlarmLevelEnum.AlarmLevelLowLow,
AlarmLevel.High => AlarmLevelEnum.AlarmLevelHigh,
AlarmLevel.HighHigh => AlarmLevelEnum.AlarmLevelHighHigh,
_ => AlarmLevelEnum.AlarmLevelNone
};
}
@@ -0,0 +1,61 @@
namespace ZB.MOM.WW.ScadaBridge.Communication;
/// <summary>
/// Configuration options for central-site communication, including per-pattern
/// timeouts and transport heartbeat settings.
/// </summary>
public class CommunicationOptions
{
/// <summary>Timeout for deployment commands (typically longest due to apply logic).</summary>
public TimeSpan DeploymentTimeout { get; set; } = TimeSpan.FromMinutes(2);
/// <summary>Timeout for lifecycle commands (disable, enable, delete).</summary>
public TimeSpan LifecycleTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for artifact deployment commands.</summary>
public TimeSpan ArtifactDeploymentTimeout { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>Timeout for remote query requests (event logs, parked messages).</summary>
public TimeSpan QueryTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for integration call routing.</summary>
public TimeSpan IntegrationTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for debug view subscribe/unsubscribe handshake.</summary>
public TimeSpan DebugViewTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>Timeout for health report acknowledgement (fire-and-forget, but bounded).</summary>
public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>
/// Notification Outbox: timeout for forwarding a buffered notification to central
/// and awaiting its <c>NotificationSubmitAck</c>. A timeout is treated as a
/// transient failure — the Store-and-Forward engine keeps the message buffered
/// and retries the forward at the fixed retry interval.
/// </summary>
public TimeSpan NotificationForwardTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Contact point addresses for the central cluster (e.g. "akka.tcp://scadabridge@central-a:8081").
/// Used by site nodes to create a ClusterClient for reaching central.
/// </summary>
public List<string> CentralContactPoints { get; set; } = new();
/// <summary>gRPC keepalive ping interval for streaming connections.</summary>
public TimeSpan GrpcKeepAlivePingDelay { get; set; } = TimeSpan.FromSeconds(15);
/// <summary>gRPC keepalive ping timeout — stream is considered dead if no response within this period.</summary>
public TimeSpan GrpcKeepAlivePingTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>Maximum lifetime for a single gRPC stream before the server forces re-establishment.</summary>
public TimeSpan GrpcMaxStreamLifetime { get; set; } = TimeSpan.FromHours(4);
/// <summary>Maximum number of concurrent gRPC streaming subscriptions per site node.</summary>
public int GrpcMaxConcurrentStreams { get; set; } = 100;
/// <summary>Akka.Remote transport heartbeat interval.</summary>
public TimeSpan TransportHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>Akka.Remote transport failure detection threshold.</summary>
public TimeSpan TransportFailureThreshold { get; set; } = TimeSpan.FromSeconds(15);
}
@@ -0,0 +1,579 @@
using Akka.Actor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Artifacts;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.InboundApi;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
namespace ZB.MOM.WW.ScadaBridge.Communication;
/// <summary>
/// Central-side service that wraps the Akka Ask pattern with per-pattern timeouts.
/// Provides a typed API for sending messages to sites and awaiting responses.
/// On connection drop, the ask times out (no central buffering per design).
/// </summary>
public class CommunicationService
{
private readonly CommunicationOptions _options;
private readonly ILogger<CommunicationService> _logger;
private IActorRef? _centralCommunicationActor;
private IActorRef? _notificationOutboxProxy;
private IActorRef? _siteCallAuditProxy;
/// <summary>
/// Initializes a new instance of the CommunicationService.
/// </summary>
/// <param name="options">Communication service configuration options.</param>
/// <param name="logger">Logger instance.</param>
public CommunicationService(
IOptions<CommunicationOptions> options,
ILogger<CommunicationService> logger)
{
_options = options.Value;
_logger = logger;
}
/// <summary>
/// Sets the central communication actor reference. Called during actor system startup.
/// </summary>
/// <param name="centralCommunicationActor">The central communication actor reference.</param>
public void SetCommunicationActor(IActorRef centralCommunicationActor)
{
_centralCommunicationActor = centralCommunicationActor;
}
/// <summary>
/// Sets the notification-outbox singleton proxy reference. Called during actor
/// system startup. The outbox actor is central-local, so outbox calls Ask this
/// proxy directly (no SiteEnvelope routing).
/// </summary>
/// <param name="notificationOutboxProxy">The notification outbox proxy reference.</param>
public void SetNotificationOutbox(IActorRef notificationOutboxProxy)
{
_notificationOutboxProxy = notificationOutboxProxy;
}
/// <summary>
/// Sets the Site Call Audit (#22) singleton proxy reference. Called during
/// actor system startup. The Site Call Audit actor is central-local, so Site
/// Calls read calls Ask this proxy directly (no SiteEnvelope routing), the
/// same pattern as <see cref="SetNotificationOutbox"/>.
/// </summary>
/// <param name="siteCallAuditProxy">The Site Call Audit proxy reference.</param>
public void SetSiteCallAudit(IActorRef siteCallAuditProxy)
{
_siteCallAuditProxy = siteCallAuditProxy;
}
/// <summary>
/// Triggers an immediate refresh of the site address cache from the database.
/// </summary>
public void RefreshSiteAddresses()
{
GetActor().Tell(new RefreshSiteAddresses());
}
/// <summary>
/// Gets the central communication actor reference. Throws if not yet initialized.
/// </summary>
public IActorRef GetCommunicationActor()
{
return _centralCommunicationActor
?? throw new InvalidOperationException("CommunicationService not initialized. CentralCommunicationActor not set.");
}
private IActorRef GetActor() => GetCommunicationActor();
/// <summary>
/// Gets the notification-outbox proxy reference. Throws if not yet initialized.
/// </summary>
private IActorRef GetNotificationOutbox()
{
return _notificationOutboxProxy
?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set.");
}
/// <summary>
/// Gets the Site Call Audit proxy reference. Throws if not yet initialized.
/// </summary>
private IActorRef GetSiteCallAudit()
{
return _siteCallAuditProxy
?? throw new InvalidOperationException("CommunicationService not initialized. SiteCallAudit proxy not set.");
}
// ── Pattern 1: Instance Deployment ──
/// <summary>
/// Sends a deployment command for an instance to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="command">The deployment command.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The deployment status response.</returns>
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
_logger.LogDebug(
"Sending DeployInstanceCommand to site {SiteId}, instance={Instance}, correlationId={DeploymentId}",
siteId, command.InstanceUniqueName, command.DeploymentId);
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
/// <summary>
/// DeploymentManager-006: queries a site for the currently-applied deployment
/// identity of a single instance. Used by the Deployment Manager before a
/// re-deploy to reconcile against the site's actual state. Sent over the
/// existing ClusterClient command/control transport; the Ask times out (no
/// central buffering) if the site is unreachable, and the caller falls
/// through to a normal deploy.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The deployment state query request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The deployment state query response.</returns>
public async Task<DeploymentStateQueryResponse> QueryDeploymentStateAsync(
string siteId, DeploymentStateQueryRequest request, CancellationToken cancellationToken = default)
{
_logger.LogDebug(
"Sending DeploymentStateQueryRequest to site {SiteId}, instance={Instance}, correlationId={CorrelationId}",
siteId, request.InstanceUniqueName, request.CorrelationId);
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<DeploymentStateQueryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
// ── Pattern 2: Lifecycle ──
/// <summary>
/// Sends a disable command for an instance to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="command">The disable command.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The instance lifecycle response.</returns>
public async Task<InstanceLifecycleResponse> DisableInstanceAsync(
string siteId, DisableInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
/// <summary>
/// Sends an enable command for an instance to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="command">The enable command.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The instance lifecycle response.</returns>
public async Task<InstanceLifecycleResponse> EnableInstanceAsync(
string siteId, EnableInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
/// <summary>
/// Sends a delete command for an instance to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="command">The delete command.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The instance lifecycle response.</returns>
public async Task<InstanceLifecycleResponse> DeleteInstanceAsync(
string siteId, DeleteInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
// ── Pattern 3: Artifact Deployment ──
/// <summary>
/// Sends a system-wide artifact deployment command to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="command">The artifact deployment command.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The artifact deployment response.</returns>
public async Task<ArtifactDeploymentResponse> DeployArtifactsAsync(
string siteId, DeployArtifactsCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<ArtifactDeploymentResponse>(
envelope, _options.ArtifactDeploymentTimeout, cancellationToken);
}
// ── Pattern 4: Integration Routing ──
/// <summary>
/// Routes an integration call to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The integration call request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The integration call response.</returns>
public async Task<IntegrationCallResponse> RouteIntegrationCallAsync(
string siteId, IntegrationCallRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<IntegrationCallResponse>(
envelope, _options.IntegrationTimeout, cancellationToken);
}
// ── Pattern 5: Debug View ──
/// <summary>
/// Subscribes to debug view events from a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The debug view subscription request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A snapshot of the debug view.</returns>
public async Task<DebugViewSnapshot> SubscribeDebugViewAsync(
string siteId, SubscribeDebugViewRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<DebugViewSnapshot>(
envelope, _options.DebugViewTimeout, cancellationToken);
}
/// <summary>
/// Unsubscribes from debug view events for a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The debug view unsubscription request.</param>
public void UnsubscribeDebugView(string siteId, UnsubscribeDebugViewRequest request)
{
// Tell (fire-and-forget) — no response expected
GetActor().Tell(new SiteEnvelope(siteId, request));
}
// ── Pattern 6a: Debug Snapshot (one-shot, request/response) ──
/// <summary>
/// Requests a snapshot of the debug view from a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The debug snapshot request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A snapshot of the debug view.</returns>
public async Task<DebugViewSnapshot> RequestDebugSnapshotAsync(
string siteId, DebugSnapshotRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<DebugViewSnapshot>(
envelope, _options.QueryTimeout, cancellationToken);
}
// ── Pattern 6b: Health Reporting (site→central, Tell) ──
// Health reports are received by central, not sent. No method needed here.
// ── Pattern 7: Remote Queries ──
/// <summary>
/// Queries event logs from a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The event log query request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The event log query response.</returns>
public async Task<EventLogQueryResponse> QueryEventLogsAsync(
string siteId, EventLogQueryRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<EventLogQueryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Queries parked messages from a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The parked message query request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The parked message query response.</returns>
public async Task<ParkedMessageQueryResponse> QueryParkedMessagesAsync(
string siteId, ParkedMessageQueryRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<ParkedMessageQueryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Retries a parked message at a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The parked message retry request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The parked message retry response.</returns>
public async Task<ParkedMessageRetryResponse> RetryParkedMessageAsync(
string siteId, ParkedMessageRetryRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<ParkedMessageRetryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Discards a parked message at a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The parked message discard request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The parked message discard response.</returns>
public async Task<ParkedMessageDiscardResponse> DiscardParkedMessageAsync(
string siteId, ParkedMessageDiscardRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<ParkedMessageDiscardResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
// ── Pattern 8: Heartbeat (site→central, Tell) ──
// Heartbeats are received by central, not sent. No method needed here.
// ── Inbound API Cross-Site Routing (WP-4) ──
/// <summary>
/// Routes an inbound API call to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The call route request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The call route response.</returns>
public async Task<RouteToCallResponse> RouteToCallAsync(
string siteId, RouteToCallRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<RouteToCallResponse>(
envelope, _options.IntegrationTimeout, cancellationToken);
}
/// <summary>
/// Routes an inbound API get-attributes request to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The get-attributes route request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The get-attributes route response.</returns>
public async Task<RouteToGetAttributesResponse> RouteToGetAttributesAsync(
string siteId, RouteToGetAttributesRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<RouteToGetAttributesResponse>(
envelope, _options.IntegrationTimeout, cancellationToken);
}
/// <summary>
/// Routes an inbound API set-attributes request to a site.
/// </summary>
/// <param name="siteId">The target site identifier.</param>
/// <param name="request">The set-attributes route request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The set-attributes route response.</returns>
public async Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<RouteToSetAttributesResponse>(
envelope, _options.IntegrationTimeout, cancellationToken);
}
// ── Notification Outbox (central-local actor — Asked directly, no SiteEnvelope) ──
/// <summary>
/// Queries the notification outbox.
/// </summary>
/// <param name="request">The notification outbox query request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The notification outbox query response.</returns>
public async Task<NotificationOutboxQueryResponse> QueryNotificationOutboxAsync(
NotificationOutboxQueryRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<NotificationOutboxQueryResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Retries a notification from the outbox.
/// </summary>
/// <param name="request">The retry notification request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The retry notification response.</returns>
public async Task<RetryNotificationResponse> RetryNotificationAsync(
RetryNotificationRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<RetryNotificationResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Discards a notification from the outbox.
/// </summary>
/// <param name="request">The discard notification request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The discard notification response.</returns>
public async Task<DiscardNotificationResponse> DiscardNotificationAsync(
DiscardNotificationRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<DiscardNotificationResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets details about a specific notification.
/// </summary>
/// <param name="request">The notification detail request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The notification detail response.</returns>
public async Task<NotificationDetailResponse> GetNotificationDetailAsync(
NotificationDetailRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<NotificationDetailResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets KPI metrics for the notification outbox.
/// </summary>
/// <param name="request">The notification KPI request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The notification KPI response.</returns>
public async Task<NotificationKpiResponse> GetNotificationKpisAsync(
NotificationKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<NotificationKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets per-site KPI metrics for the notification outbox.
/// </summary>
/// <param name="request">The per-site notification KPI request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The per-site notification KPI response.</returns>
public async Task<PerSiteNotificationKpiResponse> GetPerSiteNotificationKpisAsync(
PerSiteNotificationKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetNotificationOutbox().Ask<PerSiteNotificationKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
// ── Site Call Audit (central-local actor — Asked directly, no SiteEnvelope) ──
/// <summary>
/// Queries site call audit records.
/// </summary>
/// <param name="request">The site call query request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The site call query response.</returns>
public async Task<SiteCallQueryResponse> QuerySiteCallsAsync(
SiteCallQueryRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallQueryResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets details about a specific site call.
/// </summary>
/// <param name="request">The site call detail request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The site call detail response.</returns>
public async Task<SiteCallDetailResponse> GetSiteCallDetailAsync(
SiteCallDetailRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallDetailResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets KPI metrics for site calls.
/// </summary>
/// <param name="request">The site call KPI request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The site call KPI response.</returns>
public async Task<SiteCallKpiResponse> GetSiteCallKpisAsync(
SiteCallKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Gets per-site KPI metrics for site calls.
/// </summary>
/// <param name="request">The per-site site call KPI request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The per-site site call KPI response.</returns>
public async Task<PerSiteSiteCallKpiResponse> GetPerSiteSiteCallKpisAsync(
PerSiteSiteCallKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<PerSiteSiteCallKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Task 5 (#22): relays an operator Retry of a parked cached call to its
/// owning site. The <c>SiteCallAuditActor</c> is Asked directly (it is
/// central-local); it in turn relays a <c>RetryParkedOperation</c> to the
/// owning site and replies a <see cref="RetrySiteCallResponse"/> carrying a
/// distinct site-unreachable outcome. Central never mutates the central
/// <c>SiteCalls</c> mirror row.
/// <para>
/// This outer Ask uses <see cref="CommunicationOptions.QueryTimeout"/>
/// (default 30s), which must outlive the inner site relay Ask the
/// <c>SiteCallAuditActor</c> issues with <c>SiteCallAuditOptions.RelayTimeout</c>
/// (default 10s). The inner relay must time out first so its distinct
/// <c>SiteUnreachable</c> outcome reaches us; were this outer Ask to expire
/// first, that outcome would be lost to a generic Ask-timeout exception.
/// </para>
/// </summary>
/// <param name="request">The retry site call request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The retry site call response.</returns>
public async Task<RetrySiteCallResponse> RetrySiteCallAsync(
RetrySiteCallRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<RetrySiteCallResponse>(
request, _options.QueryTimeout, cancellationToken);
}
/// <summary>
/// Task 5 (#22): relays an operator Discard of a parked cached call to its
/// owning site. See <see cref="RetrySiteCallAsync"/> for the routing and
/// source-of-truth rationale.
/// </summary>
/// <param name="request">The discard site call request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The discard site call response.</returns>
public async Task<DiscardSiteCallResponse> DiscardSiteCallAsync(
DiscardSiteCallRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<DiscardSiteCallResponse>(
request, _options.QueryTimeout, cancellationToken);
}
}
/// <summary>
/// Envelope that wraps any message with a target site ID for routing.
/// Used by CentralCommunicationActor to resolve the site actor path.
/// </summary>
public record SiteEnvelope(string SiteId, object Message);
@@ -0,0 +1,189 @@
using System.Collections.Concurrent;
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication;
/// <summary>
/// Manages debug stream sessions by creating DebugStreamBridgeActors that persist
/// as subscribers on the site side. Both the Blazor debug view and the SignalR hub
/// use this service to start/stop streams.
/// </summary>
public class DebugStreamService
{
private readonly CommunicationService _communicationService;
private readonly IServiceProvider _serviceProvider;
private readonly SiteStreamGrpcClientFactory _grpcClientFactory;
private readonly ILogger<DebugStreamService> _logger;
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
private ActorSystem? _actorSystem;
/// <summary>
/// Initializes a new instance of the <see cref="DebugStreamService"/> class.
/// </summary>
/// <param name="communicationService">The communication service.</param>
/// <param name="serviceProvider">The service provider for dependency resolution.</param>
/// <param name="grpcClientFactory">The gRPC client factory for creating site stream clients.</param>
/// <param name="logger">The logger instance.</param>
public DebugStreamService(
CommunicationService communicationService,
IServiceProvider serviceProvider,
SiteStreamGrpcClientFactory grpcClientFactory,
ILogger<DebugStreamService> logger)
{
_communicationService = communicationService;
_serviceProvider = serviceProvider;
_grpcClientFactory = grpcClientFactory;
_logger = logger;
}
/// <summary>
/// Sets the ActorSystem reference. Called during actor system startup (from AkkaHostedService).
/// </summary>
/// <param name="actorSystem">The actor system to use for creating bridge actors.</param>
public void SetActorSystem(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
}
/// <summary>
/// Starts a debug stream session. Returns the initial snapshot.
/// Ongoing events are delivered via the onEvent callback.
/// The onTerminated callback fires if the stream is killed (site disconnect, timeout).
/// </summary>
/// <param name="instanceId">The instance ID to stream debug information for.</param>
/// <param name="onEvent">Callback invoked for each event received from the stream.</param>
/// <param name="onTerminated">Callback invoked when the stream terminates.</param>
/// <param name="ct">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A debug stream session with the initial snapshot.</returns>
public async Task<DebugStreamSession> StartStreamAsync(
int instanceId,
Action<object> onEvent,
Action onTerminated,
CancellationToken ct = default)
{
var system = _actorSystem
?? throw new InvalidOperationException("DebugStreamService not initialized. ActorSystem not set.");
// Resolve instance → unique name + site
string instanceUniqueName;
string siteIdentifier;
string grpcNodeAAddress;
string grpcNodeBAddress;
using (var scope = _serviceProvider.CreateScope())
{
var instanceRepo = scope.ServiceProvider.GetRequiredService<ITemplateEngineRepository>();
var instance = await instanceRepo.GetInstanceByIdAsync(instanceId)
?? throw new InvalidOperationException($"Instance {instanceId} not found.");
var siteRepo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var site = await siteRepo.GetSiteByIdAsync(instance.SiteId)
?? throw new InvalidOperationException($"Site {instance.SiteId} not found.");
instanceUniqueName = instance.UniqueName;
siteIdentifier = site.SiteIdentifier;
grpcNodeAAddress = site.GrpcNodeAAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured.");
grpcNodeBAddress = site.GrpcNodeBAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured.");
}
var sessionId = Guid.NewGuid().ToString("N");
// Capture the initial snapshot via a TaskCompletionSource
var snapshotTcs = new TaskCompletionSource<DebugViewSnapshot>(TaskCreationOptions.RunContinuationsAsynchronously);
Action<object> onEventWrapper = evt =>
{
if (evt is DebugViewSnapshot snapshot && !snapshotTcs.Task.IsCompleted)
{
snapshotTcs.TrySetResult(snapshot);
}
else
{
onEvent(evt);
}
};
Action onTerminatedWrapper = () =>
{
_sessions.TryRemove(sessionId, out _);
snapshotTcs.TrySetException(new InvalidOperationException("Debug stream terminated before snapshot received."));
onTerminated();
};
// Create the bridge actor — use type-based Props to avoid expression tree limitations with closures
var commActor = _communicationService.GetCommunicationActor();
var props = Props.Create(typeof(DebugStreamBridgeActor),
siteIdentifier,
instanceUniqueName,
sessionId,
commActor,
onEventWrapper,
onTerminatedWrapper,
_grpcClientFactory,
grpcNodeAAddress,
grpcNodeBAddress);
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");
_sessions[sessionId] = bridgeActor;
// Wait for the initial snapshot (with timeout)
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
DebugViewSnapshot snapshot;
try
{
snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
}
catch (Exception ex)
{
// Any failure before the snapshot arrives — the 30s timeout, or the stream
// terminating early (site disconnect / gRPC failure, surfaced by
// onTerminatedWrapper as an InvalidOperationException) — must deterministically
// tear down the bridge actor and its site-side subscription. Use the local
// actor reference: a racing onTerminatedWrapper may already have removed the
// session, which would make StopStream a no-op. StopDebugStream is idempotent
// (the actor may already be stopping itself).
_sessions.TryRemove(sessionId, out _);
bridgeActor.Tell(new StopDebugStream());
if (ex is OperationCanceledException)
throw new TimeoutException(
$"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
throw new InvalidOperationException(
$"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.",
ex);
}
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
sessionId, instanceUniqueName, siteIdentifier);
return new DebugStreamSession(sessionId, snapshot);
}
/// <summary>
/// Stops an active debug stream session.
/// </summary>
/// <param name="sessionId">The session ID of the debug stream to stop.</param>
public void StopStream(string sessionId)
{
if (_sessions.TryRemove(sessionId, out var bridgeActor))
{
bridgeActor.Tell(new StopDebugStream());
_logger.LogInformation("Debug stream {SessionId} stopped", sessionId);
}
}
}
public record DebugStreamSession(string SessionId, DebugViewSnapshot InitialSnapshot);
@@ -0,0 +1,128 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// Canonical bridge for Audit Log (#23) rows between the in-process
/// <see cref="AuditEvent"/> record and the wire-format <see cref="AuditEventDto"/>
/// exchanged over the <c>IngestAuditEvents</c>, <c>IngestCachedTelemetry</c> and
/// <c>PullAuditEvents</c> RPCs.
/// </summary>
/// <remarks>
/// <para>
/// This mapper lives in <c>ZB.MOM.WW.ScadaBridge.Communication</c> (which owns the generated
/// <see cref="AuditEventDto"/> and references <c>Commons</c> for
/// <see cref="AuditEvent"/>) so both <c>SiteStreamGrpcServer</c> and
/// <c>ZB.MOM.WW.ScadaBridge.AuditLog</c> can share one implementation without the
/// project-reference cycle that would result from hosting it in
/// <c>ZB.MOM.WW.ScadaBridge.AuditLog</c> (AuditLog → Communication, never the reverse).
/// </para>
/// <para><b>Lossy by design:</b> the proto contract intentionally omits two fields.</para>
/// <list type="bullet">
/// <item><see cref="AuditEvent.ForwardState"/> — site-local SQLite state, never travels.</item>
/// <item><see cref="AuditEvent.IngestedAtUtc"/> — central-set at ingest time, not at the site.</item>
/// </list>
/// <para>
/// String nullability convention: proto3 scalar strings cannot be absent, so nullable
/// .NET strings round-trip as empty strings on the wire. Nullable integers use the
/// <c>Int32Value</c> wrapper so they preserve true null semantics.
/// </para>
/// </remarks>
public static class AuditEventDtoMapper
{
/// <summary>
/// Projects an <see cref="AuditEvent"/> into its wire-format DTO. Null reference
/// fields collapse to empty strings; null integer fields leave the wrapper unset.
/// </summary>
/// <param name="evt">The audit event to project to wire format.</param>
public static AuditEventDto ToDto(AuditEvent evt)
{
ArgumentNullException.ThrowIfNull(evt);
var dto = new AuditEventDto
{
EventId = evt.EventId.ToString(),
OccurredAtUtc = Timestamp.FromDateTime(EnsureUtc(evt.OccurredAtUtc)),
Channel = evt.Channel.ToString(),
Kind = evt.Kind.ToString(),
CorrelationId = evt.CorrelationId?.ToString() ?? string.Empty,
ExecutionId = evt.ExecutionId?.ToString() ?? string.Empty,
ParentExecutionId = evt.ParentExecutionId?.ToString() ?? string.Empty,
SourceSiteId = evt.SourceSiteId ?? string.Empty,
SourceNode = evt.SourceNode ?? string.Empty,
SourceInstanceId = evt.SourceInstanceId ?? string.Empty,
SourceScript = evt.SourceScript ?? string.Empty,
Actor = evt.Actor ?? string.Empty,
Target = evt.Target ?? string.Empty,
Status = evt.Status.ToString(),
ErrorMessage = evt.ErrorMessage ?? string.Empty,
ErrorDetail = evt.ErrorDetail ?? string.Empty,
RequestSummary = evt.RequestSummary ?? string.Empty,
ResponseSummary = evt.ResponseSummary ?? string.Empty,
PayloadTruncated = evt.PayloadTruncated,
Extra = evt.Extra ?? string.Empty
};
if (evt.HttpStatus.HasValue)
{
dto.HttpStatus = evt.HttpStatus.Value;
}
if (evt.DurationMs.HasValue)
{
dto.DurationMs = evt.DurationMs.Value;
}
return dto;
}
/// <summary>
/// Reconstructs an <see cref="AuditEvent"/> from its wire-format DTO. Empty strings
/// rehydrate as null reference values; absent integer wrappers stay null.
/// <see cref="AuditEvent.ForwardState"/> and <see cref="AuditEvent.IngestedAtUtc"/>
/// are intentionally left null — the central ingest actor sets the latter.
/// </summary>
/// <param name="dto">The wire-format DTO to reconstruct into an <see cref="AuditEvent"/>.</param>
public static AuditEvent FromDto(AuditEventDto dto)
{
ArgumentNullException.ThrowIfNull(dto);
return new AuditEvent
{
EventId = Guid.Parse(dto.EventId),
OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = null,
Channel = Enum.Parse<AuditChannel>(dto.Channel),
Kind = Enum.Parse<AuditKind>(dto.Kind),
CorrelationId = NullIfEmpty(dto.CorrelationId) is { } cid ? Guid.Parse(cid) : null,
ExecutionId = NullIfEmpty(dto.ExecutionId) is { } eid ? Guid.Parse(eid) : null,
ParentExecutionId = NullIfEmpty(dto.ParentExecutionId) is { } pid ? Guid.Parse(pid) : null,
SourceSiteId = NullIfEmpty(dto.SourceSiteId),
SourceNode = NullIfEmpty(dto.SourceNode),
SourceInstanceId = NullIfEmpty(dto.SourceInstanceId),
SourceScript = NullIfEmpty(dto.SourceScript),
Actor = NullIfEmpty(dto.Actor),
Target = NullIfEmpty(dto.Target),
Status = Enum.Parse<AuditStatus>(dto.Status),
HttpStatus = dto.HttpStatus,
DurationMs = dto.DurationMs,
ErrorMessage = NullIfEmpty(dto.ErrorMessage),
ErrorDetail = NullIfEmpty(dto.ErrorDetail),
RequestSummary = NullIfEmpty(dto.RequestSummary),
ResponseSummary = NullIfEmpty(dto.ResponseSummary),
PayloadTruncated = dto.PayloadTruncated,
Extra = NullIfEmpty(dto.Extra),
ForwardState = null
};
}
private static string? NullIfEmpty(string? value) =>
string.IsNullOrEmpty(value) ? null : value;
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
}
@@ -0,0 +1,25 @@
using Akka.Actor;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// Abstraction over the site-side stream subscription mechanism.
/// SiteStreamManager in the SiteRuntime project implements this interface;
/// the gRPC server depends on it without referencing SiteRuntime directly.
/// </summary>
public interface ISiteStreamSubscriber
{
/// <summary>
/// Subscribes an actor to receive filtered stream events for a specific instance.
/// </summary>
/// <param name="instanceName">The unique name of the instance whose events to subscribe to.</param>
/// <param name="subscriber">The actor reference that will receive stream event messages.</param>
/// <returns>A subscription ID that can be used for unsubscription.</returns>
string Subscribe(string instanceName, IActorRef subscriber);
/// <summary>
/// Removes all subscriptions for the given actor.
/// </summary>
/// <param name="subscriber">The actor reference whose subscriptions should be removed.</param>
void RemoveSubscriber(IActorRef subscriber);
}
@@ -0,0 +1,72 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// Canonical bridge for Site Call Audit (#22) operational rows between the
/// wire-format <see cref="SiteCallOperationalDto"/> exchanged on the
/// <c>CachedCallTelemetry</c> packet and the in-process <see cref="SiteCall"/>
/// persistence entity central writes into the <c>SiteCalls</c> table.
/// </summary>
/// <remarks>
/// <para>
/// This mapper lives in <c>ZB.MOM.WW.ScadaBridge.Communication</c> (which owns the generated
/// <see cref="SiteCallOperationalDto"/> and references <c>Commons</c> for
/// <see cref="SiteCall"/>) so both <c>SiteStreamGrpcServer</c> and
/// <c>ZB.MOM.WW.ScadaBridge.AuditLog</c> can share one implementation without the
/// project-reference cycle that would result from hosting it in
/// <c>ZB.MOM.WW.ScadaBridge.AuditLog</c> (AuditLog → Communication, never the reverse).
/// Mirrors the sibling <see cref="AuditEventDtoMapper"/>.
/// </para>
/// <para>
/// Only the DTO→entity direction is provided: nothing in the system maps a
/// <see cref="SiteCall"/> back onto the wire (sites emit the operational state
/// from <c>SiteCallOperational</c>, never from the central <see cref="SiteCall"/>
/// entity), so an entity→DTO method would be dead code.
/// </para>
/// <para>
/// String nullability convention: proto3 scalar strings cannot be absent, so the
/// optional <see cref="SiteCall.LastError"/> rehydrates from an empty string back
/// to null. The optional <c>HttpStatus</c> and <c>TerminalAtUtc</c> use proto
/// wrappers so they preserve true null semantics.
/// </para>
/// </remarks>
public static class SiteCallDtoMapper
{
/// <summary>
/// Reconstructs a <see cref="SiteCall"/> persistence entity from its
/// wire-format DTO. An empty <c>LastError</c> rehydrates as null; absent
/// <c>HttpStatus</c>/<c>TerminalAtUtc</c> wrappers stay null.
/// </summary>
/// <remarks>
/// <see cref="SiteCall.IngestedAtUtc"/> is stamped here as a placeholder
/// (<see cref="DateTime.UtcNow"/>); the central ingest actor overwrites it
/// inside the dual-write transaction so the AuditLog and SiteCalls rows
/// share one instant. The value sent on the wire is informational only.
/// </remarks>
/// <param name="dto">The wire-format site call DTO to map.</param>
public static SiteCall FromDto(SiteCallOperationalDto dto)
{
ArgumentNullException.ThrowIfNull(dto);
return new SiteCall
{
TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId),
Channel = dto.Channel,
Target = dto.Target,
SourceSite = dto.SourceSite,
SourceNode = string.IsNullOrEmpty(dto.SourceNode) ? null : dto.SourceNode,
Status = dto.Status,
RetryCount = dto.RetryCount,
LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError,
HttpStatus = dto.HttpStatus,
CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc),
UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc),
TerminalAtUtc = dto.TerminalAtUtc is null
? null
: DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor
};
}
}
@@ -0,0 +1,316 @@
using System.Collections.Concurrent;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using Google.Protobuf.WellKnownTypes;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// Per-site gRPC client that manages streaming subscriptions to a site's
/// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this
/// to open server-streaming calls for individual instances.
/// </summary>
public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable
{
private readonly GrpcChannel? _channel;
private readonly SiteStreamService.SiteStreamServiceClient? _client;
private readonly ILogger? _logger;
private readonly ConcurrentDictionary<string, CancellationTokenSource> _subscriptions = new();
/// <summary>
/// The gRPC endpoint (site node address) this client is bound to. The
/// <see cref="SiteStreamGrpcClientFactory"/> compares this against the requested
/// endpoint so a NodeA→NodeB failover flip (or a site address edit) is honoured
/// rather than served stale from cache.
/// </summary>
public virtual string Endpoint { get; } = string.Empty;
/// <summary>
/// The HTTP/2 keepalive ping delay actually applied to this client's channel.
/// Exposed for tests verifying that <see cref="CommunicationOptions"/> is honoured.
/// </summary>
internal TimeSpan KeepAlivePingDelay { get; }
/// <summary>
/// The HTTP/2 keepalive ping timeout actually applied to this client's channel.
/// Exposed for tests verifying that <see cref="CommunicationOptions"/> is honoured.
/// </summary>
internal TimeSpan KeepAlivePingTimeout { get; }
/// <summary>
/// Creates a client with default communication options.
/// </summary>
/// <param name="endpoint">The gRPC endpoint address for the site.</param>
/// <param name="logger">Logger for diagnostics and errors.</param>
public SiteStreamGrpcClient(string endpoint, ILogger logger)
: this(endpoint, logger, new CommunicationOptions())
{
}
/// <summary>
/// Creates a client whose HTTP/2 keepalive is taken from <see cref="CommunicationOptions"/>
/// rather than hard-coded, satisfying the design doc's "gRPC Connection Keepalive"
/// section which states these values are configurable.
/// </summary>
/// <param name="endpoint">The gRPC endpoint address for the site.</param>
/// <param name="logger">Logger for diagnostics and errors.</param>
/// <param name="options">Communication options including keepalive settings.</param>
public SiteStreamGrpcClient(string endpoint, ILogger logger, CommunicationOptions options)
{
Endpoint = endpoint;
KeepAlivePingDelay = options.GrpcKeepAlivePingDelay;
KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout;
_channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
KeepAlivePingDelay = options.GrpcKeepAlivePingDelay,
KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout,
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
}
});
_client = new SiteStreamService.SiteStreamServiceClient(_channel);
_logger = logger;
}
/// <summary>
/// Protected constructor for unit testing without a real gRPC channel.
/// Allows subclassing for mock implementations.
/// </summary>
protected SiteStreamGrpcClient()
{
}
/// <summary>
/// Protected constructor for unit testing — records the endpoint without
/// opening a real gRPC channel, so endpoint-aware factory behaviour can be
/// exercised by test doubles.
/// </summary>
/// <param name="endpoint">The gRPC endpoint address for the site.</param>
protected SiteStreamGrpcClient(string endpoint)
{
Endpoint = endpoint;
}
/// <summary>
/// Creates a test-only instance that has no gRPC channel. Used to test
/// Unsubscribe and Dispose behavior without needing a real endpoint.
/// </summary>
internal static SiteStreamGrpcClient CreateForTesting() => new();
/// <summary>
/// Registers a CancellationTokenSource for a correlation ID. Test-only.
/// </summary>
/// <param name="correlationId">Unique identifier for the subscription.</param>
/// <param name="cts">CancellationTokenSource for managing the subscription lifecycle.</param>
internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts)
{
_subscriptions[correlationId] = cts;
}
/// <summary>
/// Registers a subscription's CancellationTokenSource for a correlation ID.
/// If an entry already exists for that correlation ID (a reconnect race where two
/// <see cref="SubscribeAsync"/> calls briefly share an ID), the prior CTS is
/// cancelled and disposed so it cannot leak. Internal for testability.
/// </summary>
/// <param name="correlationId">Unique identifier for the subscription.</param>
/// <param name="cts">CancellationTokenSource for managing the subscription lifecycle.</param>
internal void RegisterSubscription(string correlationId, CancellationTokenSource cts)
{
if (_subscriptions.TryGetValue(correlationId, out var prior) && !ReferenceEquals(prior, cts))
{
prior.Cancel();
prior.Dispose();
}
_subscriptions[correlationId] = cts;
}
/// <summary>
/// Removes the subscription entry for a correlation ID only if the stored CTS is
/// exactly the one supplied. A racing replacement stream may already own the slot,
/// in which case this is a no-op. Internal for testability.
/// </summary>
/// <param name="correlationId">Unique identifier for the subscription.</param>
/// <param name="cts">CancellationTokenSource to match before removing.</param>
internal void RemoveSubscription(string correlationId, CancellationTokenSource cts)
{
_subscriptions.TryRemove(new KeyValuePair<string, CancellationTokenSource>(correlationId, cts));
}
/// <summary>
/// Opens a server-streaming subscription for a specific instance.
/// This is a long-running async method; the caller launches it as a background task.
/// The <paramref name="onEvent"/> callback delivers domain events, and
/// <paramref name="onError"/> lets the caller handle reconnection.
/// </summary>
/// <param name="correlationId">Unique identifier for this subscription.</param>
/// <param name="instanceUniqueName">Unique name of the instance to subscribe to.</param>
/// <param name="onEvent">Callback invoked for each domain event received from the stream.</param>
/// <param name="onError">Callback invoked when the subscription encounters an error.</param>
/// <param name="ct">Cancellation token to stop the subscription.</param>
public virtual async Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action<object> onEvent,
Action<Exception> onError,
CancellationToken ct)
{
if (_client is null)
throw new InvalidOperationException("Cannot subscribe on a test-only client.");
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
RegisterSubscription(correlationId, cts);
var request = new InstanceStreamRequest
{
CorrelationId = correlationId,
InstanceUniqueName = instanceUniqueName
};
try
{
using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token);
await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token))
{
var domainEvent = ConvertToDomainEvent(evt);
if (domainEvent != null)
onEvent(domainEvent);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Normal cancellation — not an error
}
catch (Exception ex)
{
onError(ex);
}
finally
{
// Remove only our own entry -- a racing reconnect may already own the slot.
RemoveSubscription(correlationId, cts);
}
}
/// <summary>
/// Cancels an active subscription by correlation ID.
/// </summary>
/// <param name="correlationId">Unique identifier of the subscription to cancel.</param>
public virtual void Unsubscribe(string correlationId)
{
if (_subscriptions.TryRemove(correlationId, out var cts))
{
cts.Cancel();
cts.Dispose();
}
}
/// <summary>
/// Converts a proto SiteStreamEvent to the corresponding domain message.
/// Internal for testability.
/// </summary>
/// <param name="evt">The protobuf site stream event to convert.</param>
/// <returns>The converted domain event, or null if the event type is not recognized.</returns>
internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch
{
SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged(
evt.AttributeChanged.InstanceUniqueName,
evt.AttributeChanged.AttributePath,
evt.AttributeChanged.AttributeName,
evt.AttributeChanged.Value,
MapQuality(evt.AttributeChanged.Quality),
evt.AttributeChanged.Timestamp.ToDateTimeOffset()),
SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged(
evt.AlarmChanged.InstanceUniqueName,
evt.AlarmChanged.AlarmName,
MapAlarmState(evt.AlarmChanged.State),
evt.AlarmChanged.Priority,
evt.AlarmChanged.Timestamp.ToDateTimeOffset())
{
Level = MapAlarmLevel(evt.AlarmChanged.Level),
Message = evt.AlarmChanged.Message ?? string.Empty
},
_ => null
};
/// <summary>
/// Maps proto Quality enum to domain string. Internal for testability.
/// </summary>
/// <param name="quality">The protobuf quality value to map.</param>
/// <returns>The mapped quality as a string ("Good", "Uncertain", "Bad", or "Unknown").</returns>
internal static string MapQuality(Quality quality) => quality switch
{
Quality.Good => "Good",
Quality.Uncertain => "Uncertain",
Quality.Bad => "Bad",
_ => "Unknown"
};
/// <summary>
/// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability.
/// </summary>
/// <param name="state">The protobuf alarm state to map.</param>
/// <returns>The mapped domain alarm state.</returns>
internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch
{
AlarmStateEnum.AlarmStateNormal => AlarmState.Normal,
AlarmStateEnum.AlarmStateActive => AlarmState.Active,
_ => AlarmState.Normal
};
/// <summary>
/// Maps proto AlarmLevelEnum to domain AlarmLevel. Internal for testability.
/// </summary>
/// <param name="level">The protobuf alarm level to map.</param>
/// <returns>The mapped domain alarm level.</returns>
internal static AlarmLevel MapAlarmLevel(AlarmLevelEnum level) => level switch
{
AlarmLevelEnum.AlarmLevelLow => AlarmLevel.Low,
AlarmLevelEnum.AlarmLevelLowLow => AlarmLevel.LowLow,
AlarmLevelEnum.AlarmLevelHigh => AlarmLevel.High,
AlarmLevelEnum.AlarmLevelHighHigh => AlarmLevel.HighHigh,
_ => AlarmLevel.None
};
/// <summary>
/// Releases all subscription CancellationTokenSources and the underlying
/// gRPC channel. All teardown here is synchronous (CTS disposal and
/// <see cref="GrpcChannel.Dispose"/>), so a synchronous <see cref="Dispose"/>
/// can release everything without sync-over-async blocking.
/// </summary>
private void ReleaseResources()
{
foreach (var cts in _subscriptions.Values)
{
cts.Cancel();
cts.Dispose();
}
_subscriptions.Clear();
_channel?.Dispose();
}
/// <summary>
/// Asynchronously disposes of the gRPC client and all subscriptions.
/// </summary>
public virtual ValueTask DisposeAsync()
{
ReleaseResources();
return ValueTask.CompletedTask;
}
/// <summary>
/// Synchronous disposal. All resources held by this client are released
/// synchronously, so callers (e.g. <see cref="SiteStreamGrpcClientFactory.Dispose"/>)
/// need not block on the async disposal path.
/// </summary>
public virtual void Dispose()
{
ReleaseResources();
}
}
@@ -0,0 +1,134 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// Caches one <see cref="SiteStreamGrpcClient"/> per site identifier.
/// The DebugStreamBridgeActor uses this factory to obtain (or create) a
/// gRPC client for a given site before opening a streaming subscription.
/// </summary>
public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
{
private readonly ConcurrentDictionary<string, SiteStreamGrpcClient> _clients = new();
private readonly ILoggerFactory _loggerFactory;
private readonly CommunicationOptions _options;
/// <summary>
/// Test/default constructor — uses default <see cref="CommunicationOptions"/>.
/// </summary>
/// <param name="loggerFactory">Logger factory passed to created clients.</param>
public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory)
: this(loggerFactory, Options.Create(new CommunicationOptions()))
{
}
/// <summary>
/// DI constructor — flows <see cref="CommunicationOptions"/> into every created
/// <see cref="SiteStreamGrpcClient"/> so the configured gRPC keepalive settings
/// are applied rather than hard-coded defaults.
/// </summary>
/// <param name="loggerFactory">Logger factory passed to created clients.</param>
/// <param name="options">Communication options applied to each created client.</param>
public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory, IOptions<CommunicationOptions> options)
{
_loggerFactory = loggerFactory;
_options = options.Value;
}
/// <summary>
/// Returns the cached client for the site, or creates a new one. If a client is
/// already cached but bound to a *different* <paramref name="grpcEndpoint"/> — the
/// NodeA→NodeB failover flip, or a site whose gRPC address was edited — the stale
/// client is disposed and replaced with one bound to the requested endpoint.
/// Communication-012/013: keying purely by site identifier and ignoring the
/// endpoint on a cache hit defeated debug-stream node failover and meant a
/// corrected gRPC address never took effect without a central restart.
/// </summary>
/// <param name="siteIdentifier">Unique site identifier used as the cache key.</param>
/// <param name="grpcEndpoint">gRPC endpoint the returned client must be bound to.</param>
public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
// Fast path: a client is cached and already bound to the requested endpoint.
if (_clients.TryGetValue(siteIdentifier, out var existing) &&
string.Equals(existing.Endpoint, grpcEndpoint, StringComparison.Ordinal))
{
return existing;
}
// Either no client is cached, or the cached one is bound to a different
// endpoint. AddOrUpdate atomically installs a client for the requested
// endpoint; the prior (stale) client, if any, is disposed afterwards.
SiteStreamGrpcClient? stale = null;
var client = _clients.AddOrUpdate(
siteIdentifier,
_ => CreateClient(grpcEndpoint),
(_, current) =>
{
if (string.Equals(current.Endpoint, grpcEndpoint, StringComparison.Ordinal))
return current;
stale = current;
return CreateClient(grpcEndpoint);
});
stale?.Dispose();
return client;
}
/// <summary>
/// Creates a single <see cref="SiteStreamGrpcClient"/>. Overridable so tests
/// can substitute a tracking client while still exercising the factory's real
/// caching and disposal machinery.
/// </summary>
/// <param name="grpcEndpoint">gRPC endpoint the new client will connect to.</param>
protected virtual SiteStreamGrpcClient CreateClient(string grpcEndpoint)
{
var logger = _loggerFactory.CreateLogger<SiteStreamGrpcClient>();
return new SiteStreamGrpcClient(grpcEndpoint, logger, _options);
}
/// <summary>
/// Removes and disposes the client for the given site. Site *address changes* are
/// now handled transparently by <see cref="GetOrCreate"/> (it disposes and recreates
/// a client whose endpoint no longer matches). This method remains the disposal
/// path for full site *removal* — call it when a site record is deleted so its
/// cached gRPC client does not linger for the life of the process.
/// </summary>
/// <param name="siteIdentifier">Unique site identifier whose client should be removed.</param>
public async Task RemoveSiteAsync(string siteIdentifier)
{
if (_clients.TryRemove(siteIdentifier, out var client))
{
await client.DisposeAsync();
}
}
/// <summary>
/// Asynchronously disposes all cached clients and clears the cache.
/// </summary>
public async ValueTask DisposeAsync()
{
foreach (var client in _clients.Values)
{
await client.DisposeAsync();
}
_clients.Clear();
}
/// <summary>
/// Synchronous disposal. Communication-007: this used to block on
/// <c>DisposeAsync().AsTask().GetAwaiter().GetResult()</c> (sync-over-async,
/// a stall/deadlock risk during host shutdown). Each
/// <see cref="SiteStreamGrpcClient"/> releases all of its resources
/// synchronously, so we dispose them directly with no async path.
/// </summary>
public void Dispose()
{
foreach (var client in _clients.Values)
{
client.Dispose();
}
_clients.Clear();
}
}
@@ -0,0 +1,543 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
using Akka.Actor;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using GrpcStatus = Grpc.Core.Status;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// <summary>
/// gRPC service that accepts instance stream subscriptions from central nodes.
/// Creates a StreamRelayActor per subscription to bridge Akka domain events
/// through a Channel&lt;T&gt; to the gRPC response stream.
/// </summary>
public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
{
private readonly ISiteStreamSubscriber _streamSubscriber;
private ActorSystem? _actorSystem;
private readonly ILogger<SiteStreamGrpcServer> _logger;
private readonly ConcurrentDictionary<string, StreamEntry> _activeStreams = new();
private readonly int _maxConcurrentStreams;
private readonly TimeSpan _maxStreamLifetime;
private volatile bool _ready;
// Host-017 / REQ-HOST-7: flipped by CancelAllStreams() when the host enters
// CoordinatedShutdown so SubscribeInstance refuses new streams with
// Unavailable before the actor system tears down. Strictly monotonic — once
// true, never reset (the server is single-lifetime per host).
private volatile bool _shuttingDown;
private long _actorCounter;
// Audit Log (#23 M2): central-side ingest actor proxy. Set by the host
// after the cluster singleton starts (see Bundle E wiring). When null the
// IngestAuditEvents RPC replies with an empty IngestAck so sites can
// safely retry — wiring-incomplete is treated as transient, never fatal.
private IActorRef? _auditIngestActor;
// Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
// Audit Log (#23 M6): site-local queue handed in by AkkaHostedService on
// site roles so the central reconciliation puller's PullAuditEvents RPC
// can read Pending/Forwarded rows. Null when not wired (e.g. central-only
// host or test composing the server in isolation) — the handler treats
// the missing queue as "nothing to ship" and returns an empty response so
// central retries on its next reconciliation cycle.
private ISiteAuditQueue? _siteAuditQueue;
/// <summary>
/// Test-only constructor — kept <c>internal</c> so the DI container sees a
/// single public constructor and is not faced with an ambiguous choice.
/// </summary>
/// <param name="streamSubscriber">The stream subscriber for managing subscriptions.</param>
/// <param name="logger">The logger instance.</param>
/// <param name="maxConcurrentStreams">The maximum concurrent streams (default 100).</param>
internal SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger<SiteStreamGrpcServer> logger,
int maxConcurrentStreams = 100)
: this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4))
{
}
/// <summary>
/// DI constructor — binds <see cref="CommunicationOptions.GrpcMaxConcurrentStreams"/>
/// and <see cref="CommunicationOptions.GrpcMaxStreamLifetime"/> so the documented
/// concurrency limit and the 4-hour zombie-stream session timeout are honoured
/// rather than hard-coded.
/// </summary>
/// <param name="streamSubscriber">The stream subscriber for managing subscriptions.</param>
/// <param name="logger">The logger instance.</param>
/// <param name="options">Communication options containing stream limits and timeouts.</param>
public SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger<SiteStreamGrpcServer> logger,
IOptions<CommunicationOptions> options)
: this(streamSubscriber, logger,
options.Value.GrpcMaxConcurrentStreams,
options.Value.GrpcMaxStreamLifetime)
{
}
private SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger<SiteStreamGrpcServer> logger,
int maxConcurrentStreams,
TimeSpan maxStreamLifetime)
{
_streamSubscriber = streamSubscriber;
_logger = logger;
_maxConcurrentStreams = maxConcurrentStreams;
_maxStreamLifetime = maxStreamLifetime;
}
/// <summary>
/// Marks the server as ready to accept subscriptions and injects the ActorSystem.
/// Called after the site runtime actor system is fully initialized.
/// The ActorSystem is set here rather than via the constructor so that
/// the gRPC server can be created by DI before the actor system exists.
/// </summary>
/// <param name="actorSystem">The initialized Akka actor system.</param>
public void SetReady(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
_ready = true;
}
/// <summary>
/// Hands the central-side <c>AuditLogIngestActor</c> proxy to the gRPC
/// server so the <see cref="IngestAuditEvents"/> RPC can route incoming
/// site batches. Audit Log (#23) M2 wiring point — mirrors the way
/// <c>CommunicationService.SetNotificationOutbox</c> takes the Notification
/// Outbox singleton proxy. Bundle E supplies the actor after the cluster
/// singleton starts.
/// </summary>
/// <param name="proxy">The audit log ingest actor proxy.</param>
public void SetAuditIngestActor(IActorRef proxy)
{
_auditIngestActor = proxy;
}
/// <summary>
/// Hands the site-local <see cref="ISiteAuditQueue"/> (the same
/// <c>SqliteAuditWriter</c> singleton that backs <see cref="IAuditWriter"/>
/// on the script thread) to the gRPC server so the M6
/// <see cref="PullAuditEvents"/> RPC can serve central's reconciliation
/// pulls. Mirrors <see cref="SetAuditIngestActor"/>: wired post-construction
/// because the queue and the gRPC server are both DI singletons brought up
/// in independent orders on site startup.
/// </summary>
/// <param name="queue">The site audit queue for serving reconciliation pulls.</param>
public void SetSiteAuditQueue(ISiteAuditQueue queue)
{
_siteAuditQueue = queue;
}
/// <summary>
/// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the
/// site shutdown sequence — refuse new <see cref="SubscribeInstance"/>
/// streams with <see cref="StatusCode.Unavailable"/> and cancel every
/// active stream so its <c>await foreach</c> observes
/// <see cref="OperationCanceledException"/> and the response stream
/// completes with <c>Cancelled</c> on the client. Idempotent — safe to call
/// more than once. Invoked from the site host's
/// <c>IHostApplicationLifetime.ApplicationStopping</c> callback BEFORE
/// Akka's <c>CoordinatedShutdown</c> runs, so in-flight clients get a
/// clean cancellation they can reconnect on rather than a silent stream
/// that only times out via gRPC keepalive.
/// </summary>
public void CancelAllStreams()
{
_shuttingDown = true;
foreach (var entry in _activeStreams.Values)
{
try
{
entry.Cts.Cancel();
}
catch (ObjectDisposedException)
{
// Already cleaned up by its own finally — nothing to do.
}
}
}
/// <summary>
/// Host-017: exposed for test assertions on the shutdown state.
/// </summary>
internal bool IsShuttingDown => _shuttingDown;
/// <summary>
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
/// </summary>
public int ActiveStreamCount => _activeStreams.Count;
/// <summary>Effective max concurrent stream limit. Exposed for tests.</summary>
internal int MaxConcurrentStreams => _maxConcurrentStreams;
/// <summary>Effective per-stream session lifetime. Exposed for tests.</summary>
internal TimeSpan MaxStreamLifetime => _maxStreamLifetime;
/// <inheritdoc />
public override async Task SubscribeInstance(
InstanceStreamRequest request,
IServerStreamWriter<SiteStreamEvent> responseStream,
ServerCallContext context)
{
if (!_ready)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
// Host-017 / REQ-HOST-7: refuse new subscriptions during shutdown so
// CoordinatedShutdown can quiesce without racing fresh streams.
if (_shuttingDown)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server shutting down"));
// Communication-014: correlation_id arrives off the wire on a public gRPC
// endpoint and is used (below) to compose an Akka actor name. Akka actor names
// have a restricted character set — a id containing '/', whitespace, or other
// disallowed characters would make ActorOf throw InvalidActorNameException,
// escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front.
if (string.IsNullOrEmpty(request.CorrelationId) ||
!ActorPath.IsValidPathElement(request.CorrelationId))
{
throw new RpcException(new GrpcStatus(
StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier"));
}
// Duplicate prevention -- cancel existing stream for this correlationId
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
{
existingEntry.Cts.Cancel();
existingEntry.Cts.Dispose();
}
// Check max concurrent streams after duplicate removal
if (_activeStreams.Count >= _maxConcurrentStreams)
throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached"));
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
// Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer
// of dead-client detection) — forces a long-lived zombie stream to terminate
// even if keepalive PINGs never detect the loss.
if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan)
streamCts.CancelAfter(_maxStreamLifetime);
var entry = new StreamEntry(streamCts);
_activeStreams[request.CorrelationId] = entry;
var channel = Channel.CreateBounded<SiteStreamEvent>(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
var actorSeq = Interlocked.Increment(ref _actorCounter);
var relayActor = _actorSystem!.ActorOf(
Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer),
$"stream-relay-{request.CorrelationId}-{actorSeq}");
// Communication-021: the previous code called _streamSubscriber.Subscribe
// OUTSIDE the try block that owns relay-actor cleanup. If Subscribe threw
// (stale instance name, index lookup fault, site runtime shutting down),
// the freshly-created relay actor, the _activeStreams entry, the
// StreamEntry.Cts, and the Channel<SiteStreamEvent> all leaked because the
// finally never ran. Wrap Subscribe in its own try so any throw deterministically
// stops the relay actor, removes the activeStreams entry, and completes the
// channel before the RpcException escapes to the caller.
string subscriptionId;
try
{
subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Subscribe failed for {Instance} (correlation {CorrelationId}); cleaning up relay actor.",
request.InstanceUniqueName, request.CorrelationId);
_actorSystem!.Stop(relayActor);
channel.Writer.TryComplete();
_activeStreams.TryRemove(
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
throw;
}
_logger.LogInformation(
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
{
await responseStream.WriteAsync(evt, streamCts.Token);
}
}
catch (OperationCanceledException)
{
// Normal cancellation (client disconnect or duplicate replacement)
}
finally
{
_streamSubscriber.RemoveSubscriber(relayActor);
_actorSystem!.Stop(relayActor);
channel.Writer.TryComplete();
// Only remove our own entry -- a replacement stream may have already taken the slot
_activeStreams.TryRemove(
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
_logger.LogInformation(
"Stream {CorrelationId} for {Instance} ended",
request.CorrelationId, request.InstanceUniqueName);
}
}
/// <summary>
/// Audit Log (#23) M2 site→central push RPC. Decodes a site batch into
/// <see cref="AuditEvent"/> rows, Asks the central <c>AuditLogIngestActor</c>
/// proxy to persist them, and echoes the accepted EventIds back so the site
/// can flip its local rows to <c>Forwarded</c>.
/// </summary>
/// <remarks>
/// <para>
/// The DTO→entity conversion uses the shared <see cref="AuditEventDtoMapper"/>
/// (hosted in <c>ZB.MOM.WW.ScadaBridge.Communication</c> so both this server and
/// <c>ZB.MOM.WW.ScadaBridge.AuditLog</c> share one implementation without a
/// project-reference cycle).
/// </para>
/// <para>
/// When <see cref="_auditIngestActor"/> is not yet wired (host startup
/// race window), the RPC returns an empty <see cref="IngestAck"/> rather
/// than failing — the site treats the missing ack as a transient outcome
/// and retries on the next drain, which is the desired idempotent
/// behaviour.
/// </para>
/// </remarks>
/// <inheritdoc />
/// <param name="request">The audit event batch to ingest.</param>
/// <param name="context">The server call context.</param>
public override async Task<IngestAck> IngestAuditEvents(
AuditEventBatch request,
ServerCallContext context)
{
// Empty batch is a no-op; reply immediately so the client moves on.
if (request.Events.Count == 0)
{
return new IngestAck();
}
var actor = _auditIngestActor;
if (actor is null)
{
// Wiring incomplete (host startup race). Sites treat an empty
// ack as "nothing was acked, leave rows Pending, retry next
// drain" — exactly the right behaviour during host bring-up.
_logger.LogWarning(
"IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.",
request.Events.Count);
return new IngestAck();
}
var entities = new List<AuditEvent>(request.Events.Count);
foreach (var dto in request.Events)
{
entities.Add(AuditEventDtoMapper.FromDto(dto));
}
var cmd = new IngestAuditEventsCommand(entities);
IngestAuditEventsReply reply;
try
{
reply = await actor.Ask<IngestAuditEventsReply>(
cmd, AuditIngestAskTimeout, context.CancellationToken);
}
catch (Exception ex)
{
// Audit ingest is best-effort; failing this RPC at the gRPC layer
// would surface as a transport error and force the site to retry
// (which it would do anyway). Logging + an empty ack keeps the
// semantics consistent with the "wiring incomplete" path above.
_logger.LogError(ex,
"AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.",
request.Events.Count);
return new IngestAck();
}
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
/// <summary>
/// Audit Log (#23) M3 site→central combined-telemetry push RPC. Decodes a
/// batch of <see cref="CachedTelemetryPacket"/> entries into matched
/// (AuditEvent, SiteCall) pairs, Asks the central <c>AuditLogIngestActor</c>
/// proxy to persist them in dual-write transactions, and echoes the
/// AuditEvent EventIds that committed back so the site can flip its local
/// rows to <c>Forwarded</c>.
/// </summary>
/// <remarks>
/// Same wiring-incomplete fallback as <see cref="IngestAuditEvents"/>: when
/// the actor proxy has not been set the RPC replies with an empty ack so
/// sites treat the outcome as transient and retry, never a hard fault.
/// </remarks>
/// <inheritdoc />
/// <param name="request">The cached telemetry batch to ingest.</param>
/// <param name="context">The server call context.</param>
public override async Task<IngestAck> IngestCachedTelemetry(
CachedTelemetryBatch request,
ServerCallContext context)
{
if (request.Packets.Count == 0)
{
return new IngestAck();
}
var actor = _auditIngestActor;
if (actor is null)
{
_logger.LogWarning(
"IngestCachedTelemetry received {Count} packets before SetAuditIngestActor was called; returning empty ack.",
request.Packets.Count);
return new IngestAck();
}
var entries = new List<CachedTelemetryEntry>(request.Packets.Count);
foreach (var packet in request.Packets)
{
var auditEvent = AuditEventDtoMapper.FromDto(packet.AuditEvent);
var siteCall = SiteCallDtoMapper.FromDto(packet.Operational);
entries.Add(new CachedTelemetryEntry(auditEvent, siteCall));
}
var cmd = new IngestCachedTelemetryCommand(entries);
IngestCachedTelemetryReply reply;
try
{
reply = await actor.Ask<IngestCachedTelemetryReply>(
cmd, AuditIngestAskTimeout, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"AuditLogIngestActor Ask failed for combined telemetry batch of {Count} packets; returning empty ack.",
request.Packets.Count);
return new IngestAck();
}
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
/// <summary>
/// Audit Log (#23) M6 reconciliation pull RPC. Central asks the site for any
/// AuditLog rows whose <c>OccurredAtUtc &gt;= since_utc</c> and whose
/// <c>ForwardState</c> is still <c>Pending</c> or <c>Forwarded</c> (i.e. not
/// yet confirmed reconciled), bounded by <c>batch_size</c>. The site responds
/// with the rows AND flips them to
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AuditForwardState.Reconciled"/>
/// AFTER serializing the response. The flip is best-effort — if it fails
/// (e.g. SQLite disposed mid-call), rows stay Pending/Forwarded and central
/// pulls them again on the next reconciliation cycle. Idempotent.
/// </summary>
/// <remarks>
/// When <see cref="_siteAuditQueue"/> is not wired (central-only host or a
/// composition-root test exercising the server in isolation) the RPC returns
/// an empty response — central treats that as "nothing to ship" and retries
/// on its next cycle, which is the same self-healing semantics as the
/// SetAuditIngestActor wiring race window.
/// </remarks>
/// <inheritdoc />
/// <param name="request">The pull request with time bounds and batch size.</param>
/// <param name="context">The server call context.</param>
public override async Task<PullAuditEventsResponse> PullAuditEvents(
PullAuditEventsRequest request,
ServerCallContext context)
{
var queue = _siteAuditQueue;
if (queue is null)
{
_logger.LogWarning(
"PullAuditEvents invoked before SetSiteAuditQueue was called; returning empty response.");
return new PullAuditEventsResponse();
}
if (request.BatchSize <= 0)
{
// Mirrors the SubscribeInstance guard: reject malformed requests
// cleanly with InvalidArgument so the caller doesn't see a generic
// RpcException from the underlying SQLite parameter validation.
throw new RpcException(new GrpcStatus(
StatusCode.InvalidArgument, "batch_size must be > 0"));
}
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
// i.e. "pull from the beginning of recorded history", which is the
// intended behaviour for the very first reconciliation cycle.
var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
IReadOnlyList<AuditEvent> events;
try
{
events = await queue.ReadPendingSinceAsync(
since, request.BatchSize, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"ReadPendingSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
since, request.BatchSize);
return new PullAuditEventsResponse();
}
var response = new PullAuditEventsResponse
{
// batch_size saturated → tell central to issue a follow-up pull
// with an advanced cursor. The site doesn't compute the cursor —
// central walks it forward from the last returned OccurredAtUtc.
MoreAvailable = events.Count >= request.BatchSize,
};
foreach (var evt in events)
{
response.Events.Add(AuditEventDtoMapper.ToDto(evt));
}
// Flip to Reconciled AFTER projecting the response so a fault below the
// try/catch (mid-response, mid-flip) leaves the rows in Pending/Forwarded
// and central pulls them again next cycle. The flip itself is
// best-effort — its failure is a warning, not a fault, because central
// will dedup on EventId on the next pull.
var ids = new List<Guid>(events.Count);
foreach (var evt in events)
{
ids.Add(evt.EventId);
}
if (ids.Count > 0)
{
try
{
await queue.MarkReconciledAsync(ids, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"MarkReconciledAsync failed after PullAuditEvents response of {Count} rows; rows stay Pending for retry.",
ids.Count);
}
}
return response;
}
/// <summary>
/// Tracks a single active stream so cleanup only removes its own entry.
/// </summary>
private sealed record StreamEntry(CancellationTokenSource Cts);
}
@@ -0,0 +1,142 @@
syntax = "proto3";
option csharp_namespace = "ZB.MOM.WW.ScadaBridge.Communication.Grpc";
package sitestream;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto"; // Int32Value
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
}
message InstanceStreamRequest {
string correlation_id = 1;
string instance_unique_name = 2;
}
message SiteStreamEvent {
string correlation_id = 1;
oneof event {
AttributeValueUpdate attribute_changed = 2;
AlarmStateUpdate alarm_changed = 3;
}
}
enum Quality {
QUALITY_UNSPECIFIED = 0;
QUALITY_GOOD = 1;
QUALITY_UNCERTAIN = 2;
QUALITY_BAD = 3;
}
enum AlarmStateEnum {
ALARM_STATE_UNSPECIFIED = 0;
ALARM_STATE_NORMAL = 1;
ALARM_STATE_ACTIVE = 2;
}
// Severity level for an active alarm. Binary trigger types (ValueMatch,
// RangeViolation, RateOfChange) always emit ALARM_LEVEL_NONE. The HiLo
// trigger type emits one of the directional values.
enum AlarmLevelEnum {
ALARM_LEVEL_NONE = 0;
ALARM_LEVEL_LOW = 1;
ALARM_LEVEL_LOW_LOW = 2;
ALARM_LEVEL_HIGH = 3;
ALARM_LEVEL_HIGH_HIGH = 4;
}
message AttributeValueUpdate {
string instance_unique_name = 1;
string attribute_path = 2;
string attribute_name = 3;
string value = 4;
Quality quality = 5;
google.protobuf.Timestamp timestamp = 6;
}
message AlarmStateUpdate {
string instance_unique_name = 1;
string alarm_name = 2;
AlarmStateEnum state = 3;
int32 priority = 4;
google.protobuf.Timestamp timestamp = 5;
AlarmLevelEnum level = 6; // ALARM_LEVEL_NONE for binary trigger types; set by HiLo.
string message = 7; // Optional per-band operator message; empty when unset.
}
// Audit Log (#23) telemetry: single lifecycle event ferried from a site SQLite
// hot-path row to central via IngestAuditEvents. Mirrors AuditEvent (Commons)
// minus the site-local ForwardState and the central IngestedAtUtc (set on ingest).
message AuditEventDto {
string event_id = 1;
google.protobuf.Timestamp occurred_at_utc = 2;
string channel = 3;
string kind = 4;
string correlation_id = 5; // empty string represents null
string source_site_id = 6;
string source_instance_id = 7;
string source_script = 8;
string actor = 9;
string target = 10;
string status = 11;
google.protobuf.Int32Value http_status = 12; // null when absent
google.protobuf.Int32Value duration_ms = 13;
string error_message = 14;
string error_detail = 15;
string request_summary = 16;
string response_summary = 17;
bool payload_truncated = 18;
string extra = 19;
string execution_id = 20; // empty string represents null
string parent_execution_id = 21; // empty string represents null
string source_node = 22; // empty string represents null
}
message AuditEventBatch { repeated AuditEventDto events = 1; }
message IngestAck { repeated string accepted_event_ids = 1; }
// Audit Log (#23) M3 cached-call combined telemetry: a single packet carries
// both the AuditEvent row to insert and the SiteCalls operational-state upsert
// for one lifecycle event of a cached outbound call. Central writes both rows
// in one MS SQL transaction so the audit and operational mirrors never drift.
message SiteCallOperationalDto {
string tracked_operation_id = 1; // GUID string ("D" format)
string channel = 2; // "ApiOutbound" | "DbOutbound"
string target = 3;
string source_site = 4;
string status = 5; // AuditStatus name
int32 retry_count = 6;
string last_error = 7; // empty when null
google.protobuf.Int32Value http_status = 8;
google.protobuf.Timestamp created_at_utc = 9;
google.protobuf.Timestamp updated_at_utc = 10;
google.protobuf.Timestamp terminal_at_utc = 11; // absent when not terminal
string source_node = 12; // empty string represents null
}
message CachedTelemetryPacket {
AuditEventDto audit_event = 1;
SiteCallOperationalDto operational = 2;
}
message CachedTelemetryBatch { repeated CachedTelemetryPacket packets = 1; }
// Audit Log (#23) M6 reconciliation pull: central→site request for any
// site-local AuditLog rows with OccurredAtUtc >= since_utc that have not yet
// been ingested centrally (ForwardState in {Pending, Forwarded}). The site
// flips returned rows to Reconciled after the response is on the wire.
// more_available signals batch_size was saturated so the caller knows to
// issue a follow-up pull with an advanced since_utc cursor.
message PullAuditEventsRequest {
google.protobuf.Timestamp since_utc = 1;
int32 batch_size = 2;
}
message PullAuditEventsResponse {
repeated AuditEventDto events = 1;
bool more_available = 2;
}
@@ -0,0 +1,30 @@
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication;
public static class ServiceCollectionExtensions
{
/// <summary>Registers communication services including options, <see cref="CommunicationService"/>, gRPC client factory, and debug stream.</summary>
/// <param name="services">The DI service collection to register services into.</param>
public static IServiceCollection AddCommunication(this IServiceCollection services)
{
services.AddOptions<CommunicationOptions>()
.BindConfiguration("Communication");
services.AddSingleton<CommunicationService>();
services.AddSingleton<SiteStreamGrpcClientFactory>();
services.AddSingleton<DebugStreamService>();
return services;
}
/// <summary>Hook for registering additional DI services needed by communication actors; actor creation itself happens inside <c>AkkaHostedService</c>.</summary>
/// <param name="services">The DI service collection to register services into.</param>
public static IServiceCollection AddCommunicationActors(this IServiceCollection services)
{
// Actor registration happens in AkkaHostedService.RegisterCentralActors/RegisterSiteActors.
// This method is a hook for any additional DI registrations needed by the communication actors.
return services;
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,263 @@
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/sitestream.proto
// </auto-generated>
#pragma warning disable 0414, 1591, 8981, 0612
#region Designer generated code
using grpc = global::Grpc.Core;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
public static partial class SiteStreamService
{
static readonly string __ServiceName = "sitestream.SiteStreamService";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (message is global::Google.Protobuf.IBufferMessage)
{
context.SetPayloadLength(message.CalculateSize());
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
context.Complete();
return;
}
#endif
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static class __Helper_MessageCache<T>
{
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (__Helper_MessageCache<T>.IsBufferMessage)
{
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
}
#endif
return parser.ParseFrom(context.PayloadAsNewBuffer());
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest> __Marshaller_sitestream_InstanceStreamRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent> __Marshaller_sitestream_SiteStreamEvent = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch> __Marshaller_sitestream_AuditEventBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> __Marshaller_sitestream_IngestAck = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch> __Marshaller_sitestream_CachedTelemetryBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest> __Marshaller_sitestream_PullAuditEventsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse> __Marshaller_sitestream_PullAuditEventsResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent> __Method_SubscribeInstance = new grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent>(
grpc::MethodType.ServerStreaming,
__ServiceName,
"SubscribeInstance",
__Marshaller_sitestream_InstanceStreamRequest,
__Marshaller_sitestream_SiteStreamEvent);
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> __Method_IngestAuditEvents = new grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck>(
grpc::MethodType.Unary,
__ServiceName,
"IngestAuditEvents",
__Marshaller_sitestream_AuditEventBatch,
__Marshaller_sitestream_IngestAck);
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> __Method_IngestCachedTelemetry = new grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck>(
grpc::MethodType.Unary,
__ServiceName,
"IngestCachedTelemetry",
__Marshaller_sitestream_CachedTelemetryBatch,
__Marshaller_sitestream_IngestAck);
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse> __Method_PullAuditEvents = new grpc::Method<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse>(
grpc::MethodType.Unary,
__ServiceName,
"PullAuditEvents",
__Marshaller_sitestream_PullAuditEventsRequest,
__Marshaller_sitestream_PullAuditEventsResponse);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SitestreamReflection.Descriptor.Services[0]; }
}
/// <summary>Base class for server-side implementations of SiteStreamService</summary>
[grpc::BindServiceMethod(typeof(SiteStreamService), "BindService")]
public abstract partial class SiteStreamServiceBase
{
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task SubscribeInstance(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest request, grpc::IServerStreamWriter<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent> responseStream, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestCachedTelemetry(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse> PullAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Client for SiteStreamService</summary>
public partial class SiteStreamServiceClient : grpc::ClientBase<SiteStreamServiceClient>
{
/// <summary>Creates a new client for SiteStreamService</summary>
/// <param name="channel">The channel to use to make remote calls.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public SiteStreamServiceClient(grpc::ChannelBase channel) : base(channel)
{
}
/// <summary>Creates a new client for SiteStreamService that uses a custom <c>CallInvoker</c>.</summary>
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public SiteStreamServiceClient(grpc::CallInvoker callInvoker) : base(callInvoker)
{
}
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected SiteStreamServiceClient() : base()
{
}
/// <summary>Protected constructor to allow creation of configured clients.</summary>
/// <param name="configuration">The client configuration.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected SiteStreamServiceClient(ClientBaseConfiguration configuration) : base(configuration)
{
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncServerStreamingCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent> SubscribeInstance(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SubscribeInstance(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncServerStreamingCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent> SubscribeInstance(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest request, grpc::CallOptions options)
{
return CallInvoker.AsyncServerStreamingCall(__Method_SubscribeInstance, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck IngestAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestAuditEvents(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck IngestAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_IngestAuditEvents, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestAuditEventsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestAuditEventsAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestAuditEventsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_IngestAuditEvents, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck IngestCachedTelemetry(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestCachedTelemetry(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck IngestCachedTelemetry(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_IngestCachedTelemetry, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestCachedTelemetryAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestCachedTelemetryAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck> IngestCachedTelemetryAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_IngestCachedTelemetry, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse PullAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return PullAuditEvents(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse PullAuditEvents(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_PullAuditEvents, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse> PullAuditEventsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return PullAuditEventsAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse> PullAuditEventsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_PullAuditEvents, null, options, request);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration)
{
return new SiteStreamServiceClient(configuration);
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static grpc::ServerServiceDefinition BindService(SiteStreamServiceBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance)
.AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents)
.AddMethod(__Method_IngestCachedTelemetry, serviceImpl.IngestCachedTelemetry)
.AddMethod(__Method_PullAuditEvents, serviceImpl.PullAuditEvents).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static void BindService(grpc::ServiceBinderBase serviceBinder, SiteStreamServiceBase serviceImpl)
{
serviceBinder.AddMethod(__Method_SubscribeInstance, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.InstanceStreamRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamEvent>(serviceImpl.SubscribeInstance));
serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AuditEventBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck>(serviceImpl.IngestAuditEvents));
serviceBinder.AddMethod(__Method_IngestCachedTelemetry, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.IngestAck>(serviceImpl.IngestCachedTelemetry));
serviceBinder.AddMethod(__Method_PullAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse>(serviceImpl.PullAuditEvents));
}
}
}
#endregion
@@ -0,0 +1,51 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.ScadaBridge.Communication.Tests" />
<InternalsVisibleTo Include="ZB.MOM.WW.ScadaBridge.IntegrationTests" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="Akka.Remote" />
<PackageReference Include="Akka.Cluster" />
<PackageReference Include="Akka.Cluster.Tools" />
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../ZB.MOM.WW.ScadaBridge.Commons/ZB.MOM.WW.ScadaBridge.Commons.csproj" />
<ProjectReference Include="../ZB.MOM.WW.ScadaBridge.HealthMonitoring/ZB.MOM.WW.ScadaBridge.HealthMonitoring.csproj" />
</ItemGroup>
<!-- gRPC proto generation. The compiled C# is checked in under
SiteStreamGrpc/ (Sitestream.cs + SitestreamGrpc.cs) because protoc
segfaults inside our linux_arm64 Docker build image. To regenerate
after schema changes:
1. Temporarily uncomment the Protobuf ItemGroup below.
2. Delete SiteStreamGrpc/*.cs.
3. `dotnet build` (on macOS) — Grpc.Tools writes fresh files to obj/.
4. Copy obj/Debug/net10.0/Protos/*.cs into SiteStreamGrpc/.
5. Re-comment the ItemGroup.
Eventually we should switch the Docker build image to one with a
working protoc on arm64. -->
<!--
<ItemGroup>
<Protobuf Include="Protos\sitestream.proto" GrpcServices="Both" />
</ItemGroup>
-->
</Project>