Files
scadalink-design/AkkaDotNet/10-Streams.md
Joseph Doherty de636b908b Add Akka.NET reference documentation
Notes and documentation covering actors, remoting, clustering, persistence,
streams, serialization, hosting, testing, and best practices for the Akka.NET
framework used throughout the ScadaLink system.
2026-03-16 09:08:17 -04:00

209 lines
8.1 KiB
Markdown

# 10 — Streams (Akka.Streams)
## Overview
Akka.Streams provides a higher-level abstraction for building reactive, back-pressured data processing pipelines on top of the actor model. Streams compose `Source`, `Flow`, and `Sink` stages into graphs that handle flow control automatically — fast producers cannot overwhelm slow consumers.
In the SCADA system, Streams is the natural fit for processing continuous tag subscription data from equipment. Both OPC-UA and the custom protocol support tag subscriptions that push data continuously. Streams provides backpressure, batching, and error handling for these data flows without writing manual actor mailbox management.
## When to Use
- Processing continuous tag value feeds from equipment subscriptions
- Batching tag updates for historian writes (buffering individual updates and writing in bulk to SQL Server)
- Transforming and filtering raw protocol data into domain events (e.g., raw Modbus registers → typed tag values)
- Connecting device data flows to Pub-Sub for alarm/event distribution
- Any pipeline where backpressure is important (producer faster than consumer)
## When Not to Use
- Simple request/response patterns — use actors directly
- Command dispatch (point-to-point, needs acknowledgment) — use actors with Persistence
- Control flow logic (state machines, lifecycle management) — use actor `Become`/`Unbecome`
## Design Decisions for the SCADA System
### Tag Subscription Pipeline
Each device actor can host a stream that processes tag subscription updates from the protocol adapter:
```
Equipment (OPC-UA / Custom Protocol)
→ Source: Tag subscription callback
→ Flow: Parse raw values, apply deadband filtering
→ Flow: Enrich with device metadata (device ID, tag name, timestamp)
→ Sink: Forward to device actor mailbox (for state update)
→ Sink: Publish to Pub-Sub (for alarm processing, historian)
```
### Source: Wrapping Protocol Callbacks
Both OPC-UA and the custom protocol deliver tag updates via callbacks. Wrap these in a `Source` using `Source.Queue` or `Source.ActorRef`:
```csharp
// Using Source.Queue for backpressured ingestion
var (queue, source) = Source
.Queue<RawTagUpdate>(bufferSize: 1000, OverflowStrategy.DropHead)
.PreMaterialize(materializer);
// Protocol callback pushes to the queue
opcUaSubscription.OnTagChanged += (tag, value) =>
queue.OfferAsync(new RawTagUpdate(tag, value, DateTime.UtcNow));
```
`OverflowStrategy.DropHead` drops the oldest update when the buffer is full — appropriate for tag values where only the latest matters.
### Flow: Deadband Filtering
Reduce noise by filtering out tag updates that haven't changed significantly:
```csharp
Flow<TagUpdate, TagUpdate, NotUsed> deadbandFilter =
Flow.Create<TagUpdate>()
.StatefulSelectMany(() =>
{
var lastValues = new Dictionary<string, double>();
return update =>
{
if (update.Value is double current
&& lastValues.TryGetValue(update.TagName, out var last)
&& Math.Abs(current - last) < update.Deadband)
{
return Enumerable.Empty<TagUpdate>(); // Filtered out
}
lastValues[update.TagName] = (double)update.Value;
return new[] { update };
};
});
```
### Sink: Batched Historian Writes
Buffer tag updates and write to the historian database in batches:
```csharp
Sink<TagUpdate, NotUsed> historianSink =
Flow.Create<TagUpdate>()
.GroupedWithin(maxElements: 500, maxDuration: TimeSpan.FromSeconds(2))
.SelectAsync(parallelism: 2, batch => WriteToHistorian(batch))
.To(Sink.Ignore<int>());
```
This batches up to 500 updates or flushes every 2 seconds, whichever comes first. `parallelism: 2` allows two concurrent database writes.
### Materializer Lifecycle
Create one `ActorMaterializer` per `ActorSystem` (or per top-level actor). Do not create a materializer per device actor — this wastes resources:
```csharp
// In the Device Manager singleton
var materializer = Context.Materializer(); // Tied to this actor's lifecycle
```
## Common Patterns
### Fan-Out: One Source, Multiple Sinks
A single tag subscription feed often needs to go to multiple consumers (device state, historian, alarm checker). Use `Broadcast`:
```csharp
var graph = GraphDsl.Create(builder =>
{
var broadcast = builder.Add(new Broadcast<TagUpdate>(3));
var source = builder.Add(tagSubscriptionSource);
var stateSink = builder.Add(deviceStateSink);
var historianSink = builder.Add(batchedHistorianSink);
var alarmSink = builder.Add(alarmCheckSink);
builder.From(source).To(broadcast);
builder.From(broadcast.Out(0)).To(stateSink);
builder.From(broadcast.Out(1)).To(historianSink);
builder.From(broadcast.Out(2)).To(alarmSink);
return ClosedShape.Instance;
});
RunnableGraph.FromGraph(graph).Run(materializer);
```
### Error Handling with Supervision
Stream stages can fail (protocol errors, serialization issues). Use stream supervision to handle errors without killing the entire pipeline:
```csharp
var decider = Decider.From(
Directive.Resume, // Default: skip the failed element and continue
(typeof(FormatException), Directive.Resume),
(typeof(TimeoutException), Directive.Restart)
);
var settings = ActorMaterializerSettings.Create(system)
.WithSupervisionStrategy(decider);
```
### Throttling High-Frequency Updates
If equipment sends updates faster than the system can process, throttle the stream:
```csharp
Flow.Create<TagUpdate>()
.Throttle(elements: 100, per: TimeSpan.FromSeconds(1), maximumBurst: 50, ThrottleMode.Shaping)
```
## Anti-Patterns
### Building Complex Business Logic in Streams
Streams excel at data flow — transforming, filtering, routing, batching. Do not embed complex business logic (alarm escalation rules, command sequencing, state machine transitions) in stream stages. Keep that in actors where state management and supervision are more natural.
### Unbounded Buffers
Never use `OverflowStrategy.Backpressure` with an effectively unbounded buffer when the source is a protocol callback. If the protocol pushes faster than the stream processes, backpressure propagates to the protocol callback — which typically can't handle it (it's a fire-and-forget callback). Use `DropHead` or `DropTail` for tag updates where only recent values matter.
### Creating Streams Per-Message
Do not materialize a new stream for each incoming message. Stream materialization has overhead (actor creation, buffer allocation). Materialize the stream once (at actor startup) and keep it running for the lifetime of the device actor.
### Ignoring Materialized Values
When a stream completes or fails, the materialized value (a `Task`) completes. Monitor this to detect stream failures:
```csharp
var completionTask = stream.Run(materializer);
completionTask.ContinueWith(t =>
{
if (t.IsFaulted)
Self.Tell(new StreamFailed(t.Exception));
}, TaskScheduler.Default).PipeTo(Self);
```
## Configuration Guidance
```hocon
akka.stream {
# Default materializer settings
materializer {
# Initial and maximum buffer sizes for stream stages
initial-input-buffer-size = 4
max-input-buffer-size = 16
# Dispatcher for stream actors
dispatcher = "" # Uses default dispatcher
# Subscription timeout — how long a publisher waits for a subscriber
subscription-timeout {
mode = cancel
timeout = 5s
}
}
}
```
For the SCADA system, the default buffer sizes are fine. If profiling shows that specific high-throughput flows (e.g., historian batching) need larger buffers, increase `max-input-buffer-size` selectively by passing custom `ActorMaterializerSettings` to those flows.
## References
- Official Documentation: <https://getakka.net/articles/streams/introduction.html>
- Stream Quickstart: <https://getakka.net/articles/streams/quickstart.html>
- Modularity & Composition: <https://getakka.net/articles/streams/modularitycomposition.html>
- Buffers & Working with Rate: <https://getakka.net/articles/streams/buffersandworkingwithrate.html>