Notes and documentation covering actors, remoting, clustering, persistence, streams, serialization, hosting, testing, and best practices for the Akka.NET framework used throughout the ScadaLink system.
209 lines
8.1 KiB
Markdown
209 lines
8.1 KiB
Markdown
# 10 — Streams (Akka.Streams)
|
|
|
|
## Overview
|
|
|
|
Akka.Streams provides a higher-level abstraction for building reactive, back-pressured data processing pipelines on top of the actor model. Streams compose `Source`, `Flow`, and `Sink` stages into graphs that handle flow control automatically — fast producers cannot overwhelm slow consumers.
|
|
|
|
In the SCADA system, Streams is the natural fit for processing continuous tag subscription data from equipment. Both OPC-UA and the custom protocol support tag subscriptions that push data continuously. Streams provides backpressure, batching, and error handling for these data flows without writing manual actor mailbox management.
|
|
|
|
## When to Use
|
|
|
|
- Processing continuous tag value feeds from equipment subscriptions
|
|
- Batching tag updates for historian writes (buffering individual updates and writing in bulk to SQL Server)
|
|
- Transforming and filtering raw protocol data into domain events (e.g., raw Modbus registers → typed tag values)
|
|
- Connecting device data flows to Pub-Sub for alarm/event distribution
|
|
- Any pipeline where backpressure is important (producer faster than consumer)
|
|
|
|
## When Not to Use
|
|
|
|
- Simple request/response patterns — use actors directly
|
|
- Command dispatch (point-to-point, needs acknowledgment) — use actors with Persistence
|
|
- Control flow logic (state machines, lifecycle management) — use actor `Become`/`Unbecome`
|
|
|
|
## Design Decisions for the SCADA System
|
|
|
|
### Tag Subscription Pipeline
|
|
|
|
Each device actor can host a stream that processes tag subscription updates from the protocol adapter:
|
|
|
|
```
|
|
Equipment (OPC-UA / Custom Protocol)
|
|
→ Source: Tag subscription callback
|
|
→ Flow: Parse raw values, apply deadband filtering
|
|
→ Flow: Enrich with device metadata (device ID, tag name, timestamp)
|
|
→ Sink: Forward to device actor mailbox (for state update)
|
|
→ Sink: Publish to Pub-Sub (for alarm processing, historian)
|
|
```
|
|
|
|
### Source: Wrapping Protocol Callbacks
|
|
|
|
Both OPC-UA and the custom protocol deliver tag updates via callbacks. Wrap these in a `Source` using `Source.Queue` or `Source.ActorRef`:
|
|
|
|
```csharp
|
|
// Using Source.Queue for backpressured ingestion
|
|
var (queue, source) = Source
|
|
.Queue<RawTagUpdate>(bufferSize: 1000, OverflowStrategy.DropHead)
|
|
.PreMaterialize(materializer);
|
|
|
|
// Protocol callback pushes to the queue
|
|
opcUaSubscription.OnTagChanged += (tag, value) =>
|
|
queue.OfferAsync(new RawTagUpdate(tag, value, DateTime.UtcNow));
|
|
```
|
|
|
|
`OverflowStrategy.DropHead` drops the oldest update when the buffer is full — appropriate for tag values where only the latest matters.
|
|
|
|
### Flow: Deadband Filtering
|
|
|
|
Reduce noise by filtering out tag updates that haven't changed significantly:
|
|
|
|
```csharp
|
|
Flow<TagUpdate, TagUpdate, NotUsed> deadbandFilter =
|
|
Flow.Create<TagUpdate>()
|
|
.StatefulSelectMany(() =>
|
|
{
|
|
var lastValues = new Dictionary<string, double>();
|
|
return update =>
|
|
{
|
|
if (update.Value is double current
|
|
&& lastValues.TryGetValue(update.TagName, out var last)
|
|
&& Math.Abs(current - last) < update.Deadband)
|
|
{
|
|
return Enumerable.Empty<TagUpdate>(); // Filtered out
|
|
}
|
|
lastValues[update.TagName] = (double)update.Value;
|
|
return new[] { update };
|
|
};
|
|
});
|
|
```
|
|
|
|
### Sink: Batched Historian Writes
|
|
|
|
Buffer tag updates and write to the historian database in batches:
|
|
|
|
```csharp
|
|
Sink<TagUpdate, NotUsed> historianSink =
|
|
Flow.Create<TagUpdate>()
|
|
.GroupedWithin(maxElements: 500, maxDuration: TimeSpan.FromSeconds(2))
|
|
.SelectAsync(parallelism: 2, batch => WriteToHistorian(batch))
|
|
.To(Sink.Ignore<int>());
|
|
```
|
|
|
|
This batches up to 500 updates or flushes every 2 seconds, whichever comes first. `parallelism: 2` allows two concurrent database writes.
|
|
|
|
### Materializer Lifecycle
|
|
|
|
Create one `ActorMaterializer` per `ActorSystem` (or per top-level actor). Do not create a materializer per device actor — this wastes resources:
|
|
|
|
```csharp
|
|
// In the Device Manager singleton
|
|
var materializer = Context.Materializer(); // Tied to this actor's lifecycle
|
|
```
|
|
|
|
## Common Patterns
|
|
|
|
### Fan-Out: One Source, Multiple Sinks
|
|
|
|
A single tag subscription feed often needs to go to multiple consumers (device state, historian, alarm checker). Use `Broadcast`:
|
|
|
|
```csharp
|
|
var graph = GraphDsl.Create(builder =>
|
|
{
|
|
var broadcast = builder.Add(new Broadcast<TagUpdate>(3));
|
|
var source = builder.Add(tagSubscriptionSource);
|
|
var stateSink = builder.Add(deviceStateSink);
|
|
var historianSink = builder.Add(batchedHistorianSink);
|
|
var alarmSink = builder.Add(alarmCheckSink);
|
|
|
|
builder.From(source).To(broadcast);
|
|
builder.From(broadcast.Out(0)).To(stateSink);
|
|
builder.From(broadcast.Out(1)).To(historianSink);
|
|
builder.From(broadcast.Out(2)).To(alarmSink);
|
|
|
|
return ClosedShape.Instance;
|
|
});
|
|
|
|
RunnableGraph.FromGraph(graph).Run(materializer);
|
|
```
|
|
|
|
### Error Handling with Supervision
|
|
|
|
Stream stages can fail (protocol errors, serialization issues). Use stream supervision to handle errors without killing the entire pipeline:
|
|
|
|
```csharp
|
|
var decider = Decider.From(
|
|
Directive.Resume, // Default: skip the failed element and continue
|
|
(typeof(FormatException), Directive.Resume),
|
|
(typeof(TimeoutException), Directive.Restart)
|
|
);
|
|
|
|
var settings = ActorMaterializerSettings.Create(system)
|
|
.WithSupervisionStrategy(decider);
|
|
```
|
|
|
|
### Throttling High-Frequency Updates
|
|
|
|
If equipment sends updates faster than the system can process, throttle the stream:
|
|
|
|
```csharp
|
|
Flow.Create<TagUpdate>()
|
|
.Throttle(elements: 100, per: TimeSpan.FromSeconds(1), maximumBurst: 50, ThrottleMode.Shaping)
|
|
```
|
|
|
|
## Anti-Patterns
|
|
|
|
### Building Complex Business Logic in Streams
|
|
|
|
Streams excel at data flow — transforming, filtering, routing, batching. Do not embed complex business logic (alarm escalation rules, command sequencing, state machine transitions) in stream stages. Keep that in actors where state management and supervision are more natural.
|
|
|
|
### Unbounded Buffers
|
|
|
|
Never use `OverflowStrategy.Backpressure` with an effectively unbounded buffer when the source is a protocol callback. If the protocol pushes faster than the stream processes, backpressure propagates to the protocol callback — which typically can't handle it (it's a fire-and-forget callback). Use `DropHead` or `DropTail` for tag updates where only recent values matter.
|
|
|
|
### Creating Streams Per-Message
|
|
|
|
Do not materialize a new stream for each incoming message. Stream materialization has overhead (actor creation, buffer allocation). Materialize the stream once (at actor startup) and keep it running for the lifetime of the device actor.
|
|
|
|
### Ignoring Materialized Values
|
|
|
|
When a stream completes or fails, the materialized value (a `Task`) completes. Monitor this to detect stream failures:
|
|
|
|
```csharp
|
|
var completionTask = stream.Run(materializer);
|
|
completionTask.ContinueWith(t =>
|
|
{
|
|
if (t.IsFaulted)
|
|
Self.Tell(new StreamFailed(t.Exception));
|
|
}, TaskScheduler.Default).PipeTo(Self);
|
|
```
|
|
|
|
## Configuration Guidance
|
|
|
|
```hocon
|
|
akka.stream {
|
|
# Default materializer settings
|
|
materializer {
|
|
# Initial and maximum buffer sizes for stream stages
|
|
initial-input-buffer-size = 4
|
|
max-input-buffer-size = 16
|
|
|
|
# Dispatcher for stream actors
|
|
dispatcher = "" # Uses default dispatcher
|
|
|
|
# Subscription timeout — how long a publisher waits for a subscriber
|
|
subscription-timeout {
|
|
mode = cancel
|
|
timeout = 5s
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
For the SCADA system, the default buffer sizes are fine. If profiling shows that specific high-throughput flows (e.g., historian batching) need larger buffers, increase `max-input-buffer-size` selectively by passing custom `ActorMaterializerSettings` to those flows.
|
|
|
|
## References
|
|
|
|
- Official Documentation: <https://getakka.net/articles/streams/introduction.html>
|
|
- Stream Quickstart: <https://getakka.net/articles/streams/quickstart.html>
|
|
- Modularity & Composition: <https://getakka.net/articles/streams/modularitycomposition.html>
|
|
- Buffers & Working with Rate: <https://getakka.net/articles/streams/buffersandworkingwithrate.html>
|