docs: add LmxProxy requirements documentation with v2 protocol as authoritative design
Generate high-level requirements and 10 component documents derived from source code and protocol specs. Uses lmxproxy_updates.md (v2 TypedValue/QualityCode) as the source of truth, with v1 string-based encoding documented as legacy context. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
116
lmxproxy/docs/requirements/Component-SubscriptionManager.md
Normal file
116
lmxproxy/docs/requirements/Component-SubscriptionManager.md
Normal file
@@ -0,0 +1,116 @@
|
||||
# Component: SubscriptionManager
|
||||
|
||||
## Purpose
|
||||
|
||||
Manages the lifecycle of tag value subscriptions, multiplexing multiple client subscriptions onto shared MXAccess tag subscriptions and delivering updates via per-client bounded channels with configurable backpressure.
|
||||
|
||||
## Location
|
||||
|
||||
`src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs`
|
||||
|
||||
## Responsibilities
|
||||
|
||||
- Create per-client subscription channels with bounded capacity.
|
||||
- Share underlying MXAccess tag subscriptions across multiple clients subscribing to the same tags.
|
||||
- Deliver tag value updates from MXAccess callbacks to all subscribed clients.
|
||||
- Handle backpressure when client channels are full (DropOldest, DropNewest, or Wait).
|
||||
- Clean up subscriptions on client disconnect.
|
||||
- Notify all subscribed clients with bad quality when MXAccess disconnects.
|
||||
|
||||
## 1. Architecture
|
||||
|
||||
### 1.1 Per-Client Channels
|
||||
|
||||
Each subscribing client gets a bounded `System.Threading.Channel<(string address, Vtq vtq)>`:
|
||||
- Capacity: configurable (default 1000 messages).
|
||||
- Full mode: configurable (default `DropOldest`).
|
||||
- `SingleReader = true`, `SingleWriter = false`.
|
||||
|
||||
### 1.2 Shared Tag Subscriptions
|
||||
|
||||
Tag subscriptions to MXAccess are shared across clients:
|
||||
- When the first client subscribes to a tag, a new MXAccess subscription is created.
|
||||
- When additional clients subscribe to the same tag, they are added to the existing tag subscription's client set.
|
||||
- When the last client unsubscribes from a tag, the MXAccess subscription is disposed.
|
||||
|
||||
### 1.3 Thread Safety
|
||||
|
||||
- `ReaderWriterLockSlim` protects tag subscription updates.
|
||||
- `ConcurrentDictionary` for client subscription tracking.
|
||||
|
||||
## 2. Subscription Flow
|
||||
|
||||
### 2.1 Subscribe
|
||||
|
||||
`SubscribeAsync(clientId, addresses, ct)`:
|
||||
|
||||
1. Creates a bounded channel with configured capacity and full mode.
|
||||
2. Creates a `ClientSubscription` record (clientId, channel, address set, CancellationTokenSource, counters).
|
||||
3. For each tag address:
|
||||
- If the tag already has a subscription, adds the client to the existing `TagSubscription.clientIds` set.
|
||||
- Otherwise, creates a new `TagSubscription` and calls `_scadaClient.SubscribeAsync()` to register with MXAccess (outside the lock to avoid blocking).
|
||||
4. Registers a cancellation token callback to automatically call `UnsubscribeClient` on disconnect.
|
||||
5. Returns the channel reader for the GrpcServer to stream from.
|
||||
|
||||
### 2.2 Value Updates
|
||||
|
||||
`OnTagValueChanged(address, Vtq)` — called from MxAccessClient's COM event handler:
|
||||
|
||||
1. Looks up the tag subscription to find all subscribed clients.
|
||||
2. For each client, calls `channel.Writer.TryWrite((address, vtq))`.
|
||||
3. If the channel is full:
|
||||
- **DropOldest**: Logs a warning, increments `DroppedMessageCount`. The oldest message is automatically discarded by the channel.
|
||||
- **DropNewest**: Drops the incoming message.
|
||||
- **Wait**: Blocks the writer until space is available (not recommended for gRPC streaming).
|
||||
4. On channel closed (client disconnected), schedules `UnsubscribeClient` cleanup.
|
||||
|
||||
### 2.3 Unsubscribe
|
||||
|
||||
`UnsubscribeClient(clientId)`:
|
||||
|
||||
1. Removes the client from the client dictionary.
|
||||
2. For each tag the client was subscribed to, removes the client from the tag's subscriber set.
|
||||
3. If a tag has no remaining subscribers, disposes the MXAccess subscription handle.
|
||||
4. Completes the client's channel writer (signals end of stream).
|
||||
|
||||
## 3. Backpressure
|
||||
|
||||
| Mode | Behavior | Use Case |
|
||||
|------|----------|----------|
|
||||
| DropOldest | Silently discards oldest message when channel is full | Default. Fire-and-forget semantics. No client blocking. |
|
||||
| DropNewest | Drops the incoming message when channel is full | Preserves history, drops latest updates. |
|
||||
| Wait | Blocks the writer until space is available | Not recommended for gRPC streaming (blocks callback thread). |
|
||||
|
||||
Per-client statistics track `DeliveredMessageCount` and `DroppedMessageCount` for monitoring via the status dashboard.
|
||||
|
||||
## 4. Disconnection Handling
|
||||
|
||||
### 4.1 Client Disconnect
|
||||
|
||||
When a client's gRPC stream ends (cancellation or error), the cancellation token callback triggers `UnsubscribeClient`, which cleans up all tag subscriptions for that client.
|
||||
|
||||
### 4.2 MxAccess Disconnect
|
||||
|
||||
`OnConnectionStateChanged` — when the MxAccess connection drops:
|
||||
- Sends a bad-quality Vtq to all subscribed clients via their channels.
|
||||
- Each client receives an async notification of the connection loss.
|
||||
- Tag subscriptions are retained in memory for reconnection (via MxAccessClient's `_storedSubscriptions`).
|
||||
|
||||
## 5. Statistics
|
||||
|
||||
`GetSubscriptionStats()` returns:
|
||||
- `TotalClients` — number of active client subscriptions.
|
||||
- `TotalTags` — number of unique tags with active MXAccess subscriptions.
|
||||
- `ActiveSubscriptions` — total client-tag subscription count.
|
||||
|
||||
## Dependencies
|
||||
|
||||
- **MxAccessClient** (IScadaClient) — creates and disposes MXAccess tag subscriptions.
|
||||
- **Configuration** — `SubscriptionConfiguration` for channel capacity and full mode.
|
||||
|
||||
## Interactions
|
||||
|
||||
- **GrpcServer** calls `SubscribeAsync` on Subscribe RPC and reads from the returned channel.
|
||||
- **MxAccessClient** delivers value updates via the `OnTagValueChanged` callback.
|
||||
- **HealthAndMetrics** reads subscription statistics for health checks and status reports.
|
||||
- **ServiceHost** disposes the SubscriptionManager at shutdown.
|
||||
Reference in New Issue
Block a user