Notes and documentation covering actors, remoting, clustering, persistence, streams, serialization, hosting, testing, and best practices for the Akka.NET framework used throughout the ScadaLink system.
244 lines
7.9 KiB
Markdown
244 lines
7.9 KiB
Markdown
# 19 — Akka.IO (TCP/UDP Networking)
|
|
|
|
## Overview
|
|
|
|
Akka.IO provides actor-based, non-blocking TCP and UDP networking built into the core Akka library. Instead of working with raw sockets, you interact with special I/O manager actors that handle connection lifecycle, reading, and writing through messages. This fits naturally into the actor model — connection state is managed by actors, and data flows through the mailbox.
|
|
|
|
In the SCADA system, Akka.IO is a candidate for implementing the custom legacy protocol adapter. If the custom protocol runs over raw TCP sockets (not HTTP, not a managed library), Akka.IO provides the connection management layer.
|
|
|
|
## When to Use
|
|
|
|
- Implementing the custom legacy SCADA protocol adapter if it communicates over raw TCP or UDP sockets
|
|
- Building any custom network protocol handler that needs actor-based lifecycle management
|
|
- When you need non-blocking I/O integrated with the actor supervision model (automatic reconnection on failure)
|
|
|
|
## When Not to Use
|
|
|
|
- OPC-UA communication — use an OPC-UA client library (e.g., OPC Foundation's .NET Standard Stack), not raw sockets
|
|
- HTTP communication — use `HttpClient` or ASP.NET Core
|
|
- If the custom protocol has its own managed .NET client library — use that library instead, wrapping it in an actor
|
|
- High-throughput bulk data transfer — Akka.Streams with custom stages may be more appropriate for backpressure handling
|
|
|
|
## Design Decisions for the SCADA System
|
|
|
|
### Custom Protocol Actor with Akka.IO TCP
|
|
|
|
If the custom legacy protocol is a proprietary TCP-based protocol, model each device connection as an actor that uses Akka.IO:
|
|
|
|
```csharp
|
|
public class CustomProtocolConnectionActor : ReceiveActor
|
|
{
|
|
private readonly EndPoint _remoteEndpoint;
|
|
private IActorRef _connection;
|
|
|
|
public CustomProtocolConnectionActor(DeviceConfig config)
|
|
{
|
|
_remoteEndpoint = new DnsEndPoint(config.Hostname, config.Port);
|
|
Become(Disconnected);
|
|
}
|
|
|
|
private void Disconnected()
|
|
{
|
|
// Request a TCP connection
|
|
Context.System.Tcp().Tell(new Tcp.Connect(_remoteEndpoint));
|
|
|
|
Receive<Tcp.Connected>(connected =>
|
|
{
|
|
_connection = Sender;
|
|
_connection.Tell(new Tcp.Register(Self));
|
|
Become(Connected);
|
|
});
|
|
|
|
Receive<Tcp.CommandFailed>(failed =>
|
|
{
|
|
// Connection failed — schedule retry
|
|
Context.System.Scheduler.ScheduleTellOnce(
|
|
TimeSpan.FromSeconds(5), Self, new RetryConnect(), ActorRefs.NoSender);
|
|
});
|
|
|
|
Receive<RetryConnect>(_ => Become(Disconnected)); // Re-triggers connect
|
|
}
|
|
|
|
private void Connected()
|
|
{
|
|
Receive<Tcp.Received>(received =>
|
|
{
|
|
// Parse the custom protocol frame from received.Data
|
|
var frame = CustomProtocolParser.Parse(received.Data);
|
|
HandleFrame(frame);
|
|
});
|
|
|
|
Receive<SendCustomCommand>(cmd =>
|
|
{
|
|
var bytes = CustomProtocolSerializer.Serialize(cmd);
|
|
_connection.Tell(Tcp.Write.Create(ByteString.FromBytes(bytes)));
|
|
});
|
|
|
|
Receive<Tcp.ConnectionClosed>(closed =>
|
|
{
|
|
_connection = null;
|
|
Become(Disconnected);
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
### Frame Parsing and Buffering
|
|
|
|
Industrial protocols often use framed messages (length-prefixed or delimited). TCP delivers data as a byte stream, so you must handle partial reads and frame reassembly:
|
|
|
|
```csharp
|
|
private ByteString _buffer = ByteString.Empty;
|
|
|
|
private void HandleReceived(Tcp.Received received)
|
|
{
|
|
_buffer = _buffer.Concat(received.Data);
|
|
|
|
while (TryParseFrame(_buffer, out var frame, out var remaining))
|
|
{
|
|
_buffer = remaining;
|
|
ProcessFrame(frame);
|
|
}
|
|
}
|
|
|
|
private bool TryParseFrame(ByteString data, out CustomFrame frame, out ByteString remaining)
|
|
{
|
|
// Check if we have a complete frame (e.g., length prefix + payload)
|
|
if (data.Count < 4)
|
|
{
|
|
frame = null;
|
|
remaining = data;
|
|
return false;
|
|
}
|
|
|
|
var length = BitConverter.ToInt32(data.Take(4).ToArray(), 0);
|
|
if (data.Count < 4 + length)
|
|
{
|
|
frame = null;
|
|
remaining = data;
|
|
return false;
|
|
}
|
|
|
|
frame = CustomFrame.Parse(data.Slice(4, length));
|
|
remaining = data.Slice(4 + length);
|
|
return true;
|
|
}
|
|
```
|
|
|
|
### Connection Supervision
|
|
|
|
Wrap the connection actor in a parent that supervises reconnection:
|
|
|
|
```csharp
|
|
// Parent actor's supervision strategy
|
|
protected override SupervisorStrategy SupervisorStrategy()
|
|
{
|
|
return new OneForOneStrategy(
|
|
maxNrOfRetries: -1, // Unlimited retries
|
|
withinTimeRange: TimeSpan.FromMinutes(1),
|
|
decider: Decider.From(
|
|
Directive.Restart, // Restart on any exception — re-establishes connection
|
|
(typeof(SocketException), Directive.Restart)
|
|
));
|
|
}
|
|
```
|
|
|
|
### Akka.IO vs. Direct Socket Wrapper
|
|
|
|
If the custom protocol client already has a managed .NET library, wrapping it in an actor (without Akka.IO) is simpler:
|
|
|
|
```csharp
|
|
public class CustomProtocolDeviceActor : ReceiveActor
|
|
{
|
|
private readonly CustomProtocolClient _client; // Existing library
|
|
|
|
public CustomProtocolDeviceActor(DeviceConfig config)
|
|
{
|
|
_client = new CustomProtocolClient(config.Hostname, config.Port);
|
|
_client.OnTagChanged += (tag, value) =>
|
|
Self.Tell(new TagValueReceived(tag, value));
|
|
|
|
ReceiveAsync<ConnectToDevice>(async _ => await _client.ConnectAsync());
|
|
Receive<TagValueReceived>(HandleTagUpdate);
|
|
}
|
|
}
|
|
```
|
|
|
|
Use Akka.IO only if you need actor-level control over the TCP connection lifecycle, or if no managed client library exists.
|
|
|
|
## Common Patterns
|
|
|
|
### Tag Subscription via Polling or Push
|
|
|
|
If the custom protocol supports push-based tag subscriptions (the device sends updates when values change), the connection actor receives `Tcp.Received` messages passively. If polling is required, use the scheduler:
|
|
|
|
```csharp
|
|
// Polling pattern
|
|
Context.System.Scheduler.ScheduleTellRepeatedly(
|
|
TimeSpan.FromSeconds(1),
|
|
TimeSpan.FromSeconds(1),
|
|
Self,
|
|
new PollTags(),
|
|
ActorRefs.NoSender);
|
|
|
|
Receive<PollTags>(_ =>
|
|
{
|
|
foreach (var tag in _subscribedTags)
|
|
{
|
|
var request = CustomProtocolSerializer.CreateReadRequest(tag);
|
|
_connection.Tell(Tcp.Write.Create(ByteString.FromBytes(request)));
|
|
}
|
|
});
|
|
```
|
|
|
|
### Backpressure with Ack-Based Writing
|
|
|
|
For high-throughput writes, use Akka.IO's ack-based flow control:
|
|
|
|
```csharp
|
|
_connection.Tell(Tcp.Write.Create(data, ack: new WriteAck()));
|
|
|
|
Receive<WriteAck>(_ =>
|
|
{
|
|
// Previous write completed — safe to send next
|
|
SendNextQueuedCommand();
|
|
});
|
|
```
|
|
|
|
## Anti-Patterns
|
|
|
|
### Blocking Socket Operations in Actors
|
|
|
|
Never use synchronous socket calls (`Socket.Receive`, `Socket.Send`) inside an actor. This blocks the dispatcher thread. Akka.IO handles all I/O asynchronously.
|
|
|
|
### Not Handling Partial Reads
|
|
|
|
TCP is a stream protocol. A single `Tcp.Received` message may contain a partial frame, multiple frames, or a frame split across two receives. Always implement frame buffering and parsing.
|
|
|
|
### Creating One TCP Manager Per Device
|
|
|
|
The TCP manager (`Context.System.Tcp()`) is a singleton per ActorSystem. Do not create additional instances. Each device actor sends `Tcp.Connect` to the same TCP manager.
|
|
|
|
## Configuration Guidance
|
|
|
|
```hocon
|
|
akka.io.tcp {
|
|
# Buffer pool settings — defaults are fine for SCADA scale
|
|
buffer-pool = "akka.io.tcp.disabled-buffer-pool"
|
|
|
|
# Maximum number of open channels
|
|
max-channels = 1024 # Sufficient for 500 devices
|
|
|
|
# Batch sizes for reads
|
|
received-message-size-limit = 65536 # 64KB per read
|
|
direct-buffer-size = 65536
|
|
}
|
|
```
|
|
|
|
For 500 device connections, the default TCP settings are adequate. Increase `max-channels` only if you anticipate more concurrent connections.
|
|
|
|
## References
|
|
|
|
- Official Documentation: <https://getakka.net/articles/networking/io.html>
|
|
- Akka IO Configuration: <https://getakka.net/articles/configuration/modules/akka.html>
|