using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Workers; namespace ZB.MOM.WW.MxGateway.Server.Sessions; public sealed class GatewaySession { private readonly object _syncRoot = new(); private readonly SemaphoreSlim _closeLock = new(1, 1); private readonly SessionEventStreaming _eventStreaming; private IWorkerClient? _workerClient; private SessionState _state = SessionState.Creating; private string? _finalFault; private DateTimeOffset _lastClientActivityAt; private DateTimeOffset? _leaseExpiresAt; private bool _closeStarted; private int _activeEventSubscriberCount; private SessionEventDistributor? _eventDistributor; private bool _eventDistributorStarted; private bool _dashboardMirrorStarted; private IEventSubscriberLease? _dashboardMirrorLease; private Task? _dashboardMirrorTask; private CancellationTokenSource? _dashboardMirrorCts; private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = []; /// /// Initializes a gateway session with session metadata and timeout configuration. /// /// Identifier of the session. /// Name of the backend MXAccess proxy server. /// Name of the named pipe for gateway-worker IPC. /// Security nonce for worker validation. /// Client identity from the authentication context. /// Client-supplied session name. /// Client-supplied correlation identifier. /// Timeout for command invocation. /// Timeout for worker process startup. /// Timeout for worker process shutdown. /// Timestamp when the session opened. /// /// Constructs a session with no owner key ( will be null). /// Authenticated call sites that have a resolved API key identity must use the /// 12-parameter overload and pass the caller's key id explicitly. /// public GatewaySession( string sessionId, string backendName, string pipeName, string nonce, string? clientIdentity, string? clientSessionName, string? clientCorrelationId, TimeSpan commandTimeout, TimeSpan startupTimeout, TimeSpan shutdownTimeout, DateTimeOffset openedAt) : this( sessionId, backendName, pipeName, nonce, clientIdentity, ownerKeyId: null, clientSessionName, clientCorrelationId, commandTimeout, startupTimeout, shutdownTimeout, TimeSpan.FromMinutes(30), openedAt) { } /// /// Initializes a gateway session with session metadata, timeout configuration, and custom lease duration. /// /// Identifier of the session. /// Name of the backend MXAccess proxy server. /// Name of the named pipe for gateway-worker IPC. /// Security nonce for worker validation. /// Client identity from the authentication context. /// API key identifier of the caller that created this session. /// Client-supplied session name. /// Client-supplied correlation identifier. /// Timeout for command invocation. /// Timeout for worker process startup. /// Timeout for worker process shutdown. /// Duration of the session lease. /// Timestamp when the session opened. /// /// Dependencies the session uses to construct and own its /// (the single per-session worker-event pump /// that fans raw mapped s to every subscriber lease). When /// , defaults are used (no replay logger, system clock, a /// fresh mapper, and default ) so unit tests that build a /// session directly still get a working distributor. Production passes the /// DI-resolved dependencies. /// public GatewaySession( string sessionId, string backendName, string pipeName, string nonce, string? clientIdentity, string? ownerKeyId, string? clientSessionName, string? clientCorrelationId, TimeSpan commandTimeout, TimeSpan startupTimeout, TimeSpan shutdownTimeout, TimeSpan leaseDuration, DateTimeOffset openedAt, SessionEventStreaming? eventStreaming = null) { if (string.IsNullOrWhiteSpace(sessionId)) { throw new ArgumentException("Session id is required.", nameof(sessionId)); } if (string.IsNullOrWhiteSpace(backendName)) { throw new ArgumentException("Backend name is required.", nameof(backendName)); } if (string.IsNullOrWhiteSpace(pipeName)) { throw new ArgumentException("Pipe name is required.", nameof(pipeName)); } if (string.IsNullOrWhiteSpace(nonce)) { throw new ArgumentException("Nonce is required.", nameof(nonce)); } SessionId = sessionId; BackendName = backendName; PipeName = pipeName; Nonce = nonce; ClientIdentity = clientIdentity; OwnerKeyId = ownerKeyId; ClientSessionName = clientSessionName; ClientCorrelationId = clientCorrelationId; CommandTimeout = commandTimeout; StartupTimeout = startupTimeout; ShutdownTimeout = shutdownTimeout; LeaseDuration = leaseDuration; OpenedAt = openedAt; _lastClientActivityAt = openedAt; _leaseExpiresAt = openedAt + leaseDuration; _eventStreaming = eventStreaming ?? SessionEventStreaming.Default; } /// /// Gets the session identifier. /// public string SessionId { get; } /// /// Gets the backend MXAccess proxy server name. /// public string BackendName { get; } /// /// Gets the named pipe name for gateway-worker IPC. /// public string PipeName { get; } /// /// Gets the security nonce for worker validation. /// public string Nonce { get; } /// /// Gets the client identity from the authentication context. /// public string? ClientIdentity { get; } /// /// Gets the API key identifier of the caller that created this session. /// public string? OwnerKeyId { get; } /// /// Gets the client-supplied session name. /// public string? ClientSessionName { get; } /// /// Gets the client-supplied correlation identifier. /// public string? ClientCorrelationId { get; } /// /// Gets the command invocation timeout. /// public TimeSpan CommandTimeout { get; } /// /// Gets the worker process startup timeout. /// public TimeSpan StartupTimeout { get; } /// /// Gets the worker process shutdown timeout. /// public TimeSpan ShutdownTimeout { get; } /// Gets the lease duration for the session. public TimeSpan LeaseDuration { get; } /// /// Gets the timestamp when the session opened. /// public DateTimeOffset OpenedAt { get; } /// /// Gets the worker process identifier, or null if not yet attached. /// public int? WorkerProcessId => _workerClient?.ProcessId; /// /// Gets the attached worker client, or null if not yet attached. /// public IWorkerClient? WorkerClient => _workerClient; /// /// Gets the current session state. /// public SessionState State { get { lock (_syncRoot) { return _state; } } } /// /// Gets the timestamp of the most recent client activity. /// public DateTimeOffset LastClientActivityAt { get { lock (_syncRoot) { return _lastClientActivityAt; } } } /// /// Gets the lease expiration timestamp, or null if no lease is active. /// public DateTimeOffset? LeaseExpiresAt { get { lock (_syncRoot) { return _leaseExpiresAt; } } } /// /// Gets the fault description if the session is faulted, or null. /// public string? FinalFault { get { lock (_syncRoot) { return _finalFault; } } } /// /// Gets the count of active event stream subscribers. /// public int ActiveEventSubscriberCount { get { lock (_syncRoot) { return _activeEventSubscriberCount; } } } /// /// Attaches the worker client for this session. /// /// Worker client to attach. public void AttachWorkerClient(IWorkerClient workerClient) { ArgumentNullException.ThrowIfNull(workerClient); lock (_syncRoot) { _workerClient = workerClient; } } /// /// Transitions the session to a new state with constraints for terminal states. /// /// Next session state to transition to. /// /// is terminal. /// only allows a transition to . /// only allows a transition to /// (or ) — once /// has started, no late lifecycle callback can revive the /// session by walking it back to or any earlier /// state. Both close-related writes (Closing and Closed) go through /// _syncRoot just like every other state read/write, closing the split-lock /// race called out in Server-015. /// public void TransitionTo(SessionState nextState) { lock (_syncRoot) { if (_state is SessionState.Closed) { return; } if (_state is SessionState.Faulted && nextState is not SessionState.Closed) { return; } if (_state is SessionState.Closing && nextState is not SessionState.Closed && nextState is not SessionState.Faulted) { return; } _state = nextState; } } /// /// Transitions the session to the Ready state. /// /// /// On becoming Ready the session starts its internal dashboard mirror (Task 6) when a /// dashboard broadcaster was supplied. The mirror registers an internal subscriber on /// the distributor and starts the pump before any gRPC client attaches, so the /// dashboard EventsHub receives session events even with no gRPC subscriber streaming — /// fixing the "dark feed" where the dashboard only saw events while a gRPC client was /// actively streaming. Registering the internal subscriber BEFORE /// also avoids the Task 4 hazard where /// starting the pump at Ready with zero subscribers drained a fast-completing worker /// stream into nothing and left a later subscriber hanging: there is now always a /// subscriber (the dashboard one) registered before the pump starts. /// public void MarkReady() { TransitionTo(SessionState.Ready); StartDashboardMirror(); } // Constructs and starts the distributor exactly once, registering the subscriber under // the same start so no event the pump fans can be missed between start and register. // Started lazily on the FIRST AttachEventSubscriber rather than at MarkReady: today the // worker event stream is only drained when a client begins streaming, so deferring the // single drain to first-attach preserves that "events start flowing on subscribe" // behavior and avoids draining a fast-completing source into the void before any // subscriber exists. The source factory mirrors the mapping/ordering/start that // EventStreamService.ProduceEventsAsync used before Task 4: it drains the worker event // stream in source order and maps each WorkerEvent to the public MxEvent with the same // mapper, with no skip/filter — per-RPC filtering (e.g. AfterWorkerSequence) stays at the // subscriber boundary in EventStreamService. Returns a registered lease atomically with // the start so the very first subscriber sees the stream from its beginning. private IEventSubscriberLease StartDistributorAndRegister() { SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow); // Register BEFORE starting the pump so a subscriber is present when the pump begins // draining — no event is fanned to an empty subscriber set and then missed by this // first subscriber. StartAsync only schedules the pump task; it never blocks. IEventSubscriberLease lease = distributor.Register(); StartPumpIfRequested(distributor, startNow); return lease; } // Constructs the distributor exactly once and reports whether THIS caller is the one // that should start the pump (i.e. it observed the unstarted state and claimed the // start). Both the construction and the started-flag flip happen under _syncRoot so two // concurrent callers (e.g. MarkReady's dashboard mirror and a racing first // AttachEventSubscriber) agree on a single distributor and a single start. private SessionEventDistributor EnsureDistributorCreated(out bool startNow) { lock (_syncRoot) { if (_eventDistributor is null) { EventOptions eventOptions = _eventStreaming.EventOptions; _eventDistributor = new SessionEventDistributor( SessionId, MapWorkerEventsAsync, eventOptions.QueueCapacity, eventOptions.ReplayBufferCapacity, eventOptions.ReplayRetentionSeconds, _eventStreaming.DistributorLogger, _eventStreaming.TimeProvider, CreateOverflowHandler(eventOptions.BackpressurePolicy), singleSubscriberMode: !_eventStreaming.AllowMultipleEventSubscribers); } startNow = false; if (!_eventDistributorStarted) { _eventDistributorStarted = true; startNow = true; } return _eventDistributor; } } private static void StartPumpIfRequested(SessionEventDistributor distributor, bool startNow) { if (!startNow) { return; } // StartAsync only schedules the pump via Task.Run and returns a completed task; // it does not perform any async I/O itself. The sync-over-async call here is // therefore safe and will not deadlock. Do not make StartAsync truly async // (i.e., await real I/O before returning) without also changing this call site. distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult(); } // Registers the gateway-owned internal dashboard subscriber on the distributor and starts // a background loop that mirrors every fanned event to the dashboard broadcaster. Called // once when the session becomes Ready (idempotent). The internal subscriber is registered // BEFORE the pump starts (see StartDistributorAndRegister / EnsureDistributorCreated), so // a subscriber is always present at pump start — the dashboard receives events with no // gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang // cannot occur. No-op when no dashboard broadcaster was supplied (unit tests). // // Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published // atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under // that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot // to avoid lock inversion with the distributor's own lifecycle lock), we re-enter // _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/ // Faulted at that point, we dispose the just-created lease immediately and do NOT start // the mirror task, so nothing is orphaned. private void StartDashboardMirror() { IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster; if (broadcaster is null) { return; } CancellationToken loopToken; lock (_syncRoot) { if (_dashboardMirrorStarted || _state is SessionState.Closing or SessionState.Closed or SessionState.Faulted) { return; } _dashboardMirrorStarted = true; _dashboardMirrorCts = new CancellationTokenSource(); loopToken = _dashboardMirrorCts.Token; } // Create the distributor (claiming the start if we are first) and register the // internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard // subscriber out of the single-subscriber overflow accounting, so a slow/broken // dashboard mirror only disconnects itself and never faults the session. // These three calls are OUTSIDE _syncRoot to avoid holding it across // EnsureDistributorCreated's own lock and StartAsync's Task.Run. SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow); IEventSubscriberLease lease = distributor.Register(isInternal: true); StartPumpIfRequested(distributor, startNow); // Publish BOTH the lease and the task atomically under one lock section so // DisposeAsync always sees them in a consistent state: either both are set or // both are null. If the session already started disposal before we got here, // dispose the lease immediately instead of orphaning it. lock (_syncRoot) { if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted) { // Disposal already ran (or is in progress) — discard the just-created // lease now so it is not orphaned. Do NOT launch the mirror task. lease.Dispose(); return; } _dashboardMirrorLease = lease; _dashboardMirrorTask = Task.Run( () => RunDashboardMirrorAsync(broadcaster, lease, loopToken), CancellationToken.None); } } // Reads the internal dashboard subscriber's channel and publishes each RAW fanned event // to the dashboard broadcaster. The dashboard is a first-class distributor subscriber // (Task 6), so it sees the session's full raw event activity — NOT the per-gRPC-subscriber // AfterWorkerSequence filtering that EventStreamService applies at its own boundary. This // is intentional: the dashboard is a separate LDAP-authenticated monitoring view (per- // session dashboard ACL is the separate Task 18). Publish is best-effort / never-throw, so // a slow or broken dashboard cannot fault the session or stall the pump; the bounded // internal subscriber channel (Task 5 per-subscriber isolation) only disconnects THIS // mirror on overflow, leaving the session and other subscribers untouched. private async Task RunDashboardMirrorAsync( IDashboardEventBroadcaster broadcaster, IEventSubscriberLease lease, CancellationToken cancellationToken) { try { await foreach (MxEvent mxEvent in lease.Reader .ReadAllAsync(cancellationToken) .ConfigureAwait(false)) { try { broadcaster.Publish(SessionId, mxEvent); } catch (Exception exception) { // Publish is documented never-throw, but enforce it here too so a future // implementation cannot fault the mirror loop. Logs identifiers only. _eventStreaming.DistributorLogger.LogDebug( exception, "Dashboard event mirror threw for session {SessionId}; continuing.", SessionId); } } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Teardown path: the session is shutting down the mirror. } catch (SessionManagerException) { // The internal subscriber's channel overflowed and the distributor disconnected // it with a terminal overflow fault. That disconnects only the dashboard mirror; // the session, pump, and any gRPC subscriber are unaffected. Stop mirroring. } catch (Exception exception) { // Source-fault completion (worker event stream terminated abnormally) surfaces // here. The session's own fault handling runs via the gRPC path / lifecycle; the // mirror just stops. Logs identifiers only. _eventStreaming.DistributorLogger.LogDebug( exception, "Dashboard event mirror loop ended for session {SessionId}.", SessionId); } } // Builds the per-subscriber backpressure handler the distributor invokes when a // subscriber's bounded channel overflows. The distributor always disconnects the // offending subscriber with an EventQueueOverflow fault; this handler adds the // observable side effects, preserving exactly what the pre-epic per-RPC overflow path // emitted: // - always record the queue-overflow metric, labeled by subscriber kind; // - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole // session and record the fault metric, matching back-compat behavior; // - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT // fault the session — the distributor's disconnect of the one slow subscriber is the // whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber // FailFast deliberately degrades to a disconnect because faulting a shared session on // one slow consumer would punish healthy subscribers. // The delegate now carries isInternal directly (Issue 4), so the metric label is chosen // without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external. private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy) { GatewayMetrics metrics = _eventStreaming.Metrics; string sessionId = SessionId; return (isOnlySubscriber, isInternal) => { // Label the overflow metric by subscriber kind. The distributor passes isInternal // directly, so no heuristic is needed to distinguish an internal overflow (the // gateway-owned dashboard mirror) from an external one (a gRPC streaming client). string label = isInternal ? "dashboard-mirror" : "grpc-event-stream"; metrics.QueueOverflow(label); if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber) { MarkFaulted($"Session {sessionId} event stream queue overflowed."); metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString()); } }; } // The distributor's single event source. Drains the worker event stream once (the // distributor guarantees a single consumer) and maps each frame to the public MxEvent, // preserving worker order. Mirrors the former ProduceEventsAsync mapping exactly. private async IAsyncEnumerable MapWorkerEventsAsync( [EnumeratorCancellation] CancellationToken cancellationToken) { MxAccessGrpcMapper mapper = _eventStreaming.Mapper; await foreach (WorkerEvent workerEvent in ReadEventsAsync(cancellationToken) .ConfigureAwait(false)) { yield return mapper.MapEvent(workerEvent); } } /// /// Transitions the session to the Faulted state with a fault description. /// /// Reason for the fault. public void MarkFaulted(string reason) { lock (_syncRoot) { if (_state is SessionState.Closed) { return; } _finalFault = reason; _state = SessionState.Faulted; } } /// /// Updates the timestamp of the most recent client activity. /// /// Timestamp of the client activity. public void TouchClientActivity(DateTimeOffset activityAt) { lock (_syncRoot) { _lastClientActivityAt = activityAt; _leaseExpiresAt = activityAt + LeaseDuration; } } /// /// Extends the session lease to the specified expiration time. /// /// Timestamp when the lease expires. public void ExtendLease(DateTimeOffset leaseExpiresAt) { lock (_syncRoot) { _leaseExpiresAt = leaseExpiresAt; } } /// /// Determines whether the session lease has expired. /// /// Current timestamp for comparison. public bool IsLeaseExpired(DateTimeOffset now) { lock (_syncRoot) { return _activeEventSubscriberCount == 0 && _leaseExpiresAt is not null && _leaseExpiresAt <= now; } } /// /// Attaches an event subscriber and returns a lease whose /// reads the fanned public /// s for this subscriber. The returned lease, when disposed, /// unregisters the distributor subscriber AND decrements the active-subscriber count. /// /// /// When , single-subscriber mode: a second concurrent EXTERNAL /// subscriber is rejected with . /// When , multi-subscriber mode: up to /// concurrent EXTERNAL subscribers are allowed; the /// next attach is rejected with /// . /// /// /// Maximum concurrent external subscribers in multi-subscriber mode /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when /// is (the effective /// cap is then 1). The gateway-owned internal dashboard subscriber is registered /// directly on the distributor and is NOT counted here, so it never consumes cap budget. /// /// /// The count-check-and-increment runs atomically under _syncRoot, so two /// concurrent attaches racing toward the cap can never both succeed past it. On /// distributor-register failure the count is rolled back (see the catch below). /// public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers, int maxSubscribers) { // Effective cap: 1 in single-subscriber mode, otherwise the configured maximum // (clamped to at least 1 so a misconfigured non-positive value can never deadlock // attaches in multi-subscriber mode). int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1; lock (_syncRoot) { if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, $"Session {SessionId} is not ready for event streaming. Current state is {_state}."); } if (_activeEventSubscriberCount >= effectiveCap) { throw allowMultipleSubscribers ? new SessionManagerException( SessionManagerErrorCode.EventSubscriberLimitReached, $"Session {SessionId} has reached its maximum of {effectiveCap} concurrent event stream subscribers.") : new SessionManagerException( SessionManagerErrorCode.EventSubscriberAlreadyActive, $"Session {SessionId} already has an active event stream subscriber."); } _activeEventSubscriberCount++; } // Construct/start the distributor and register this subscriber. Done outside the // guard lock (StartDistributorAndRegister takes _syncRoot itself for construction). // On any failure roll back the count we just took so the guard stays consistent. try { IEventSubscriberLease distributorLease = StartDistributorAndRegister(); return new EventSubscriberLease(this, distributorLease); } catch { DetachEventSubscriber(); throw; } } /// /// Invokes a worker command synchronously and returns the reply. /// /// Worker command to invoke. /// Token to cancel the asynchronous operation. public async Task InvokeAsync( WorkerCommand command, CancellationToken cancellationToken) { IWorkerClient workerClient = GetReadyWorkerClient(); TouchClientActivity(DateTimeOffset.UtcNow); return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); } /// Gets the item registration for a server and item handle pair. /// The MXAccess server handle. /// The MXAccess item handle. /// The item registration if found. public bool TryGetItemRegistration( int serverHandle, int itemHandle, out SessionItemRegistration registration) { lock (_syncRoot) { return _items.TryGetValue((serverHandle, itemHandle), out registration!); } } /// Tracks item registrations from a command reply. /// The executed command. /// The command reply. public void TrackCommandReply( MxCommand command, MxCommandReply reply) { if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok) { return; } lock (_syncRoot) { switch (command.Kind) { case MxCommandKind.AddItem when reply.AddItem is not null: TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition); break; case MxCommandKind.AddItem2 when reply.AddItem2 is not null: TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition); break; case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null: TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition); break; case MxCommandKind.AddItemBulk when reply.AddItemBulk is not null: TrackBulkItems(reply.AddItemBulk); break; case MxCommandKind.SubscribeBulk when reply.SubscribeBulk is not null: TrackBulkItems(reply.SubscribeBulk); break; case MxCommandKind.RemoveItem: _items.Remove((command.RemoveItem.ServerHandle, command.RemoveItem.ItemHandle)); break; case MxCommandKind.RemoveItemBulk: RemoveItems(command.RemoveItemBulk.ServerHandle, command.RemoveItemBulk.ItemHandles); break; case MxCommandKind.UnsubscribeBulk: RemoveItems(command.UnsubscribeBulk.ServerHandle, command.UnsubscribeBulk.ItemHandles); break; } } } /// /// Executes a bulk add-item command for the specified server and tag addresses. /// /// Server handle returned by the worker. /// Tag addresses to add. /// Token to cancel the asynchronous operation. public Task> AddItemBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(tagAddresses); AddItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.TagAddresses.Add(tagAddresses); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = bulkCommand, }, reply => reply.AddItemBulk, cancellationToken); } /// /// Executes a bulk advise-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to advise. /// Token to cancel the asynchronous operation. public Task> AdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); AdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.AdviseItemBulk, AdviseItemBulk = bulkCommand, }, reply => reply.AdviseItemBulk, cancellationToken); } /// /// Executes a bulk remove-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to remove. /// Token to cancel the asynchronous operation. public Task> RemoveItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); RemoveItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.RemoveItemBulk, RemoveItemBulk = bulkCommand, }, reply => reply.RemoveItemBulk, cancellationToken); } /// /// Executes a bulk un-advise-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to un-advise. /// Token to cancel the asynchronous operation. public Task> UnAdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); UnAdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.UnAdviseItemBulk, UnAdviseItemBulk = bulkCommand, }, reply => reply.UnAdviseItemBulk, cancellationToken); } /// /// Executes a bulk subscribe command for the specified server and tag addresses. /// /// Server handle returned by the worker. /// Tag addresses to subscribe to. /// Token to cancel the asynchronous operation. public Task> SubscribeBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(tagAddresses); SubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.TagAddresses.Add(tagAddresses); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = bulkCommand, }, reply => reply.SubscribeBulk, cancellationToken); } /// /// Executes a bulk unsubscribe command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to unsubscribe from. /// Token to cancel the asynchronous operation. public Task> UnsubscribeBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); UnsubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = bulkCommand, }, reply => reply.UnsubscribeBulk, cancellationToken); } /// Executes a bulk Write command for the specified server and per-item entries. /// Server handle returned by the worker. /// Write entries to execute. /// Token to cancel the asynchronous operation. public Task> WriteBulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(entries); WriteBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.Entries.Add(entries); return InvokeBulkWriteAsync( new MxCommand { Kind = MxCommandKind.WriteBulk, WriteBulk = bulkCommand, }, reply => reply.WriteBulk, cancellationToken); } /// Executes a bulk Write2 (timestamped) command. /// Server handle returned by the worker. /// Write entries to execute. /// Token to cancel the asynchronous operation. public Task> Write2BulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(entries); Write2BulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.Entries.Add(entries); return InvokeBulkWriteAsync( new MxCommand { Kind = MxCommandKind.Write2Bulk, Write2Bulk = bulkCommand, }, reply => reply.Write2Bulk, cancellationToken); } /// Executes a bulk WriteSecured command. /// Server handle returned by the worker. /// Write entries to execute. /// Token to cancel the asynchronous operation. public Task> WriteSecuredBulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(entries); WriteSecuredBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.Entries.Add(entries); return InvokeBulkWriteAsync( new MxCommand { Kind = MxCommandKind.WriteSecuredBulk, WriteSecuredBulk = bulkCommand, }, reply => reply.WriteSecuredBulk, cancellationToken); } /// Executes a bulk WriteSecured2 command. /// Server handle returned by the worker. /// Write entries to execute. /// Token to cancel the asynchronous operation. public Task> WriteSecured2BulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(entries); WriteSecured2BulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.Entries.Add(entries); return InvokeBulkWriteAsync( new MxCommand { Kind = MxCommandKind.WriteSecured2Bulk, WriteSecured2Bulk = bulkCommand, }, reply => reply.WriteSecured2Bulk, cancellationToken); } /// /// Executes a bulk Read command — see ReadBulkCommand's doc /// comment in the .proto for the cached-vs-snapshot semantics. /// /// Server handle returned by the worker. /// Tag addresses to read. /// Timeout for the read operation. /// Token to cancel the asynchronous operation. public Task> ReadBulkAsync( int serverHandle, IReadOnlyList tagAddresses, TimeSpan timeout, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(tagAddresses); ReadBulkCommand bulkCommand = new() { ServerHandle = serverHandle, TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue), }; bulkCommand.TagAddresses.Add(tagAddresses); return InvokeBulkReadAsync( new MxCommand { Kind = MxCommandKind.ReadBulk, ReadBulk = bulkCommand, }, reply => reply.ReadBulk, cancellationToken); } /// /// Reads events from the worker as an asynchronous enumerable stream. /// /// Token to cancel the asynchronous operation. public IAsyncEnumerable ReadEventsAsync(CancellationToken cancellationToken) { IWorkerClient workerClient = GetReadyWorkerClient(); TouchClientActivity(DateTimeOffset.UtcNow); return workerClient.ReadEventsAsync(cancellationToken); } /// /// Closes the session and shuts down the worker process. /// /// Reason for closing the session. /// Token to cancel the asynchronous operation. /// /// Concurrent close attempts are serialized by _closeLock so only one close /// runs at a time, but every read/write of _state still passes through /// _syncRoot (via and ) — /// the close path therefore obeys the same lock discipline as /// / and a concurrent /// TransitionTo(Ready) cannot race past a Closing write. /// public async Task CloseAsync( string reason, CancellationToken cancellationToken) { await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { try { if (!TryBeginClose(out bool alreadyClosing)) { return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true); } if (_workerClient is not null) { try { await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { try { _workerClient.Kill(reason); } catch (Exception killException) { throw new SessionCloseStartedException( $"Session {SessionId} close failed after worker shutdown started.", new AggregateException(exception, killException)); } throw; } } MarkClosed(); return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing); } catch (Exception exception) when (exception is not SessionCloseStartedException) { throw new SessionCloseStartedException( $"Session {SessionId} close failed after the close lock was acquired.", exception); } } finally { _closeLock.Release(); } } // Returns false when the session is already Closed (caller short-circuits with // AlreadyClosed: true). Otherwise sets _state = Closing under _syncRoot so a // concurrent TransitionTo(Ready) — which only refuses to overwrite Closed/Faulted // — cannot flip the session back to Ready after close started. The `alreadyClosing` // out parameter mirrors the previous `_closeStarted` check so the surface contract // (a second concurrent close returns AlreadyClosed: alreadyClosing) is preserved. private bool TryBeginClose(out bool alreadyClosing) { lock (_syncRoot) { if (_state is SessionState.Closed) { alreadyClosing = _closeStarted; return false; } alreadyClosing = _closeStarted; _closeStarted = true; _state = SessionState.Closing; return true; } } // Final terminal transition; under _syncRoot to keep _state writes single-lock. // Closed is unconditionally terminal — TransitionTo refuses to overwrite it — // so we don't need to re-check the precondition here. private void MarkClosed() { lock (_syncRoot) { _state = SessionState.Closed; } } /// /// Terminates the worker process immediately. /// /// Reason for killing the worker. public void KillWorker(string reason) { _workerClient?.Kill(reason); TransitionTo(SessionState.Closed); } /// /// Terminates the worker process immediately while holding the per-session /// close lock so concurrent close/kill callers serialize. Returns the /// session state observed at the start of the call so the caller can /// dedup metric accounting (e.g. only record SessionClosed when /// the session was not already closed). /// /// /// Mirrors 's use of _closeLock so that /// a Close in flight from one caller and a Kill from another do not /// race on the "was the session already closed" observation that /// drives metric increments (Server-045). /// /// Reason for killing the worker. /// Cancellation token. /// true if the session was already when the lock was acquired; otherwise false. public async ValueTask KillWorkerWithCloseGateAsync( string reason, CancellationToken cancellationToken) { await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { bool wasClosed; lock (_syncRoot) { wasClosed = _state == SessionState.Closed; } _workerClient?.Kill(reason); TransitionTo(SessionState.Closed); return wasClosed; } finally { _closeLock.Release(); } } /// /// Disposes the session and frees associated resources. /// /// /// Acquires _closeLock once before disposing so an in-flight /// finishes before the semaphore is released and /// reclaimed. Without this gate, the in-flight close's _closeLock.Release() /// would race the dispose and raise . /// The acquire is best-effort: a non-cancellable wait that swallows /// so double-dispose still completes. /// public async ValueTask DisposeAsync() { try { // CancellationToken.None — disposal must not be cancelled, and a misbehaving // close path that never releases would have to be torn down by the worker // shutdown timeout long before we reach here. await _closeLock.WaitAsync(CancellationToken.None).ConfigureAwait(false); try { // Hand the slot back so the semaphore's internal counter is consistent // for any contemporaneous waiter, then dispose. Once disposed, every // subsequent WaitAsync / Release will throw — but DisposeAsync's contract // is "no concurrent close after this point", which SessionManager honors. _closeLock.Release(); } catch (ObjectDisposedException) { } } catch (ObjectDisposedException) { // Already disposed (e.g. double-dispose); nothing to gate on. } try { _closeLock.Dispose(); } catch (ObjectDisposedException) { } // Stop the internal dashboard mirror first: cancel its loop, dispose its lease (which // unregisters its internal distributor subscriber and completes its channel), and // await the loop task. Done BEFORE disposing the distributor and worker client — like // the distributor itself — so the mirror is no longer reading the pump when the pump // and its source (the worker client) tear down. IEventSubscriberLease? dashboardLease; Task? dashboardTask; CancellationTokenSource? dashboardCts; lock (_syncRoot) { dashboardLease = _dashboardMirrorLease; dashboardTask = _dashboardMirrorTask; dashboardCts = _dashboardMirrorCts; _dashboardMirrorLease = null; _dashboardMirrorTask = null; _dashboardMirrorCts = null; } if (dashboardCts is not null) { await dashboardCts.CancelAsync().ConfigureAwait(false); } dashboardLease?.Dispose(); if (dashboardTask is not null) { try { await dashboardTask.ConfigureAwait(false); } catch (Exception) { // The mirror loop swallows its own faults; any escape here must not block // disposal. The loop has stopped, which is all teardown requires. } } dashboardCts?.Dispose(); // Stop the event pump and complete every subscriber channel before tearing down the // worker client (the pump's source). DisposeAsync is the single session teardown // point (SessionManager.RemoveSessionAsync awaits it after close), so awaiting it // here guarantees the distributor's pump task is observed and subscribers are // completed rather than left dangling. SessionEventDistributor? distributor; lock (_syncRoot) { distributor = _eventDistributor; _eventDistributor = null; } if (distributor is not null) { await distributor.DisposeAsync().ConfigureAwait(false); } if (_workerClient is not null) { await _workerClient.DisposeAsync().ConfigureAwait(false); } } private async Task> InvokeBulkAsync( MxCommand command, Func payloadAccessor, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false); return payloadAccessor(reply)?.Results.ToArray() ?? []; } private async Task> InvokeBulkWriteAsync( MxCommand command, Func payloadAccessor, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false); return payloadAccessor(reply)?.Results.ToArray() ?? []; } private async Task> InvokeBulkReadAsync( MxCommand command, Func payloadAccessor, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false); return payloadAccessor(reply)?.Results.ToArray() ?? []; } // Single round-trip + protocol-status check shared by every bulk variant. // Callers project the typed reply payload out via their own accessor — the // outer envelope handling is identical across SubscribeResult-based bulks, // BulkWriteResult-based writes, and BulkReadResult-based reads. private async Task InvokeBulkInternalAsync( MxCommand command, CancellationToken cancellationToken) { WorkerCommandReply workerReply = await InvokeAsync( new WorkerCommand { Command = command }, cancellationToken) .ConfigureAwait(false); MxCommandReply reply = workerReply.Reply ?? new MxCommandReply { ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.ProtocolViolation, Message = "Worker command reply did not contain a public reply payload.", }, }; if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok) { string message = reply.ProtocolStatus?.Message ?? reply.DiagnosticMessage; throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, string.IsNullOrWhiteSpace(message) ? "Bulk MXAccess command failed." : message); } return reply; } /// /// Returns the worker client iff both the gateway-side session state AND /// the worker client's own state are / /// . The two states can diverge under /// load: _state only transitions on gateway-driven events (open, /// close, fault), while can shift on /// worker-side signals (heartbeat watchdog, pipe disconnect) before the /// gateway's session-level reaction observes them. When that happens the /// in-flight RPC fails fast here with both states surfaced in the /// diagnostic (Server-030) so the actual mismatch is actionable instead /// of misleading. The session usually transitions to Faulted /// shortly after. /// private IWorkerClient GetReadyWorkerClient() { lock (_syncRoot) { if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) { string workerState = _workerClient is null ? "" : _workerClient.State.ToString(); throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}."); } return _workerClient; } } private void TrackItem( int serverHandle, int itemHandle, string tagAddress) { if (itemHandle == 0 || string.IsNullOrWhiteSpace(tagAddress)) { return; } _items[(serverHandle, itemHandle)] = new SessionItemRegistration(serverHandle, itemHandle, tagAddress); } private void TrackBulkItems(BulkSubscribeReply reply) { foreach (SubscribeResult result in reply.Results) { if (result.WasSuccessful) { TrackItem(result.ServerHandle, result.ItemHandle, result.TagAddress); } } } private void RemoveItems( int serverHandle, IEnumerable itemHandles) { foreach (int itemHandle in itemHandles) { _items.Remove((serverHandle, itemHandle)); } } private void DetachEventSubscriber() { lock (_syncRoot) { if (_activeEventSubscriberCount > 0) { _activeEventSubscriberCount--; } } } private sealed class EventSubscriberLease(GatewaySession session, IEventSubscriberLease distributorLease) : IEventSubscriberLease { // 0 = live, 1 = disposed. Interlocked so concurrent stream-completion + // client-cancellation paths cannot both call DetachEventSubscriber and // double-decrement _activeEventSubscriberCount to -1. private int _leaseDisposed; /// public System.Threading.Channels.ChannelReader Reader => distributorLease.Reader; /// /// Disposes the lease: unregisters this subscriber from the distributor (completing /// its channel) and decrements the session's active-subscriber count. Ordering is /// not significant — the count guard and the distributor registration are /// independent — but both must run exactly once. /// public void Dispose() { if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0) { distributorLease.Dispose(); session.DetachEventSubscriber(); } } } }