Files
scadalink-design/AkkaDotNet/10-Streams.md
Joseph Doherty de636b908b Add Akka.NET reference documentation
Notes and documentation covering actors, remoting, clustering, persistence,
streams, serialization, hosting, testing, and best practices for the Akka.NET
framework used throughout the ScadaLink system.
2026-03-16 09:08:17 -04:00

8.1 KiB

10 — Streams (Akka.Streams)

Overview

Akka.Streams provides a higher-level abstraction for building reactive, back-pressured data processing pipelines on top of the actor model. Streams compose Source, Flow, and Sink stages into graphs that handle flow control automatically — fast producers cannot overwhelm slow consumers.

In the SCADA system, Streams is the natural fit for processing continuous tag subscription data from equipment. Both OPC-UA and the custom protocol support tag subscriptions that push data continuously. Streams provides backpressure, batching, and error handling for these data flows without writing manual actor mailbox management.

When to Use

  • Processing continuous tag value feeds from equipment subscriptions
  • Batching tag updates for historian writes (buffering individual updates and writing in bulk to SQL Server)
  • Transforming and filtering raw protocol data into domain events (e.g., raw Modbus registers → typed tag values)
  • Connecting device data flows to Pub-Sub for alarm/event distribution
  • Any pipeline where backpressure is important (producer faster than consumer)

When Not to Use

  • Simple request/response patterns — use actors directly
  • Command dispatch (point-to-point, needs acknowledgment) — use actors with Persistence
  • Control flow logic (state machines, lifecycle management) — use actor Become/Unbecome

Design Decisions for the SCADA System

Tag Subscription Pipeline

Each device actor can host a stream that processes tag subscription updates from the protocol adapter:

Equipment (OPC-UA / Custom Protocol)
  → Source: Tag subscription callback
  → Flow: Parse raw values, apply deadband filtering
  → Flow: Enrich with device metadata (device ID, tag name, timestamp)
  → Sink: Forward to device actor mailbox (for state update)
  → Sink: Publish to Pub-Sub (for alarm processing, historian)

Source: Wrapping Protocol Callbacks

Both OPC-UA and the custom protocol deliver tag updates via callbacks. Wrap these in a Source using Source.Queue or Source.ActorRef:

// Using Source.Queue for backpressured ingestion
var (queue, source) = Source
    .Queue<RawTagUpdate>(bufferSize: 1000, OverflowStrategy.DropHead)
    .PreMaterialize(materializer);

// Protocol callback pushes to the queue
opcUaSubscription.OnTagChanged += (tag, value) =>
    queue.OfferAsync(new RawTagUpdate(tag, value, DateTime.UtcNow));

OverflowStrategy.DropHead drops the oldest update when the buffer is full — appropriate for tag values where only the latest matters.

Flow: Deadband Filtering

Reduce noise by filtering out tag updates that haven't changed significantly:

Flow<TagUpdate, TagUpdate, NotUsed> deadbandFilter =
    Flow.Create<TagUpdate>()
        .StatefulSelectMany(() =>
        {
            var lastValues = new Dictionary<string, double>();
            return update =>
            {
                if (update.Value is double current
                    && lastValues.TryGetValue(update.TagName, out var last)
                    && Math.Abs(current - last) < update.Deadband)
                {
                    return Enumerable.Empty<TagUpdate>(); // Filtered out
                }
                lastValues[update.TagName] = (double)update.Value;
                return new[] { update };
            };
        });

Sink: Batched Historian Writes

Buffer tag updates and write to the historian database in batches:

Sink<TagUpdate, NotUsed> historianSink =
    Flow.Create<TagUpdate>()
        .GroupedWithin(maxElements: 500, maxDuration: TimeSpan.FromSeconds(2))
        .SelectAsync(parallelism: 2, batch => WriteToHistorian(batch))
        .To(Sink.Ignore<int>());

This batches up to 500 updates or flushes every 2 seconds, whichever comes first. parallelism: 2 allows two concurrent database writes.

Materializer Lifecycle

Create one ActorMaterializer per ActorSystem (or per top-level actor). Do not create a materializer per device actor — this wastes resources:

// In the Device Manager singleton
var materializer = Context.Materializer();  // Tied to this actor's lifecycle

Common Patterns

Fan-Out: One Source, Multiple Sinks

A single tag subscription feed often needs to go to multiple consumers (device state, historian, alarm checker). Use Broadcast:

var graph = GraphDsl.Create(builder =>
{
    var broadcast = builder.Add(new Broadcast<TagUpdate>(3));
    var source = builder.Add(tagSubscriptionSource);
    var stateSink = builder.Add(deviceStateSink);
    var historianSink = builder.Add(batchedHistorianSink);
    var alarmSink = builder.Add(alarmCheckSink);

    builder.From(source).To(broadcast);
    builder.From(broadcast.Out(0)).To(stateSink);
    builder.From(broadcast.Out(1)).To(historianSink);
    builder.From(broadcast.Out(2)).To(alarmSink);

    return ClosedShape.Instance;
});

RunnableGraph.FromGraph(graph).Run(materializer);

Error Handling with Supervision

Stream stages can fail (protocol errors, serialization issues). Use stream supervision to handle errors without killing the entire pipeline:

var decider = Decider.From(
    Directive.Resume,   // Default: skip the failed element and continue
    (typeof(FormatException), Directive.Resume),
    (typeof(TimeoutException), Directive.Restart)
);

var settings = ActorMaterializerSettings.Create(system)
    .WithSupervisionStrategy(decider);

Throttling High-Frequency Updates

If equipment sends updates faster than the system can process, throttle the stream:

Flow.Create<TagUpdate>()
    .Throttle(elements: 100, per: TimeSpan.FromSeconds(1), maximumBurst: 50, ThrottleMode.Shaping)

Anti-Patterns

Building Complex Business Logic in Streams

Streams excel at data flow — transforming, filtering, routing, batching. Do not embed complex business logic (alarm escalation rules, command sequencing, state machine transitions) in stream stages. Keep that in actors where state management and supervision are more natural.

Unbounded Buffers

Never use OverflowStrategy.Backpressure with an effectively unbounded buffer when the source is a protocol callback. If the protocol pushes faster than the stream processes, backpressure propagates to the protocol callback — which typically can't handle it (it's a fire-and-forget callback). Use DropHead or DropTail for tag updates where only recent values matter.

Creating Streams Per-Message

Do not materialize a new stream for each incoming message. Stream materialization has overhead (actor creation, buffer allocation). Materialize the stream once (at actor startup) and keep it running for the lifetime of the device actor.

Ignoring Materialized Values

When a stream completes or fails, the materialized value (a Task) completes. Monitor this to detect stream failures:

var completionTask = stream.Run(materializer);
completionTask.ContinueWith(t =>
{
    if (t.IsFaulted)
        Self.Tell(new StreamFailed(t.Exception));
}, TaskScheduler.Default).PipeTo(Self);

Configuration Guidance

akka.stream {
  # Default materializer settings
  materializer {
    # Initial and maximum buffer sizes for stream stages
    initial-input-buffer-size = 4
    max-input-buffer-size = 16

    # Dispatcher for stream actors
    dispatcher = ""  # Uses default dispatcher

    # Subscription timeout — how long a publisher waits for a subscriber
    subscription-timeout {
      mode = cancel
      timeout = 5s
    }
  }
}

For the SCADA system, the default buffer sizes are fine. If profiling shows that specific high-throughput flows (e.g., historian batching) need larger buffers, increase max-input-buffer-size selectively by passing custom ActorMaterializerSettings to those flows.

References