feat: enforce account-scoped remote delivery semantics
This commit is contained in:
@@ -48,13 +48,13 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct)
|
||||
=> WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct);
|
||||
|
||||
public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo;
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
var control = Encoding.ASCII.GetBytes($"GMSG {subject} {reply} {payload.Length}\r\n");
|
||||
var control = Encoding.ASCII.GetBytes($"GMSG {account} {subject} {reply} {payload.Length}\r\n");
|
||||
await _stream.WriteAsync(control, ct);
|
||||
if (!payload.IsEmpty)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
@@ -94,9 +94,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("A+ ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -104,9 +104,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("A- ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -115,12 +115,36 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
continue;
|
||||
|
||||
var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0)
|
||||
if (args.Length < 4)
|
||||
continue;
|
||||
|
||||
var account = "$G";
|
||||
string subject;
|
||||
string replyToken;
|
||||
string sizeToken;
|
||||
|
||||
// New format: GMSG <account> <subject> <reply> <size>
|
||||
// Legacy format: GMSG <subject> <reply> <size>
|
||||
if (args.Length >= 5 && !LooksLikeSubject(args[1]))
|
||||
{
|
||||
account = args[1];
|
||||
subject = args[2];
|
||||
replyToken = args[3];
|
||||
sizeToken = args[4];
|
||||
}
|
||||
else
|
||||
{
|
||||
subject = args[1];
|
||||
replyToken = args[2];
|
||||
sizeToken = args[3];
|
||||
}
|
||||
|
||||
if (!int.TryParse(sizeToken, out var size) || size < 0)
|
||||
continue;
|
||||
|
||||
var payload = await ReadPayloadAsync(size, ct);
|
||||
if (MessageReceived != null)
|
||||
await MessageReceived(new GatewayMessage(args[1], args[2] == "-" ? null : args[2], payload));
|
||||
await MessageReceived(new GatewayMessage(subject, replyToken == "-" ? null : replyToken, payload, account));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,4 +239,4 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
|| token.Contains('>', StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);
|
||||
public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload, string Account = "$G");
|
||||
|
||||
@@ -64,16 +64,16 @@ public sealed class GatewayManager : IAsyncDisposable
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
foreach (var connection in _connections.Values)
|
||||
await connection.SendMessageAsync(subject, replyTo, payload, ct);
|
||||
await connection.SendMessageAsync(account, subject, replyTo, payload, ct);
|
||||
}
|
||||
|
||||
public async Task ForwardJetStreamClusterMessageAsync(GatewayMessage message, CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref _forwardedJetStreamClusterMessages);
|
||||
await ForwardMessageAsync(message.Subject, message.ReplyTo, message.Payload, ct);
|
||||
await ForwardMessageAsync(message.Account, message.Subject, message.ReplyTo, message.Payload, ct);
|
||||
}
|
||||
|
||||
public void PropagateLocalSubscription(string account, string subject, string? queue)
|
||||
|
||||
@@ -48,13 +48,13 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
|
||||
public Task SendLsMinusAsync(string account, string subject, string? queue, CancellationToken ct)
|
||||
=> WriteLineAsync(queue is { Length: > 0 } ? $"LS- {account} {subject} {queue}" : $"LS- {account} {subject}", ct);
|
||||
|
||||
public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo;
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
var control = Encoding.ASCII.GetBytes($"LMSG {subject} {reply} {payload.Length}\r\n");
|
||||
var control = Encoding.ASCII.GetBytes($"LMSG {account} {subject} {reply} {payload.Length}\r\n");
|
||||
await _stream.WriteAsync(control, ct);
|
||||
if (!payload.IsEmpty)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
@@ -94,9 +94,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("LS+ ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -104,9 +104,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("LS- ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -115,12 +115,36 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
|
||||
continue;
|
||||
|
||||
var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0)
|
||||
if (args.Length < 4)
|
||||
continue;
|
||||
|
||||
var account = "$G";
|
||||
string subject;
|
||||
string replyToken;
|
||||
string sizeToken;
|
||||
|
||||
// New format: LMSG <account> <subject> <reply> <size>
|
||||
// Legacy format: LMSG <subject> <reply> <size>
|
||||
if (args.Length >= 5 && !LooksLikeSubject(args[1]))
|
||||
{
|
||||
account = args[1];
|
||||
subject = args[2];
|
||||
replyToken = args[3];
|
||||
sizeToken = args[4];
|
||||
}
|
||||
else
|
||||
{
|
||||
subject = args[1];
|
||||
replyToken = args[2];
|
||||
sizeToken = args[3];
|
||||
}
|
||||
|
||||
if (!int.TryParse(sizeToken, out var size) || size < 0)
|
||||
continue;
|
||||
|
||||
var payload = await ReadPayloadAsync(size, ct);
|
||||
if (MessageReceived != null)
|
||||
await MessageReceived(new LeafMessage(args[1], args[2] == "-" ? null : args[2], payload));
|
||||
await MessageReceived(new LeafMessage(subject, replyToken == "-" ? null : replyToken, payload, account));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,4 +239,4 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
|
||||
|| token.Contains('>', StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);
|
||||
public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload, string Account = "$G");
|
||||
|
||||
@@ -58,10 +58,10 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
foreach (var connection in _connections.Values)
|
||||
await connection.SendMessageAsync(subject, replyTo, payload, ct);
|
||||
await connection.SendMessageAsync(account, subject, replyTo, payload, ct);
|
||||
}
|
||||
|
||||
public void PropagateLocalSubscription(string account, string subject, string? queue)
|
||||
|
||||
@@ -871,7 +871,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
private void ProcessRoutedMessage(RouteMessage message)
|
||||
{
|
||||
DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload);
|
||||
DeliverRemoteMessage(message.Account, message.Subject, message.ReplyTo, message.Payload);
|
||||
}
|
||||
|
||||
private void ProcessGatewayMessage(GatewayMessage message)
|
||||
@@ -880,7 +880,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
if (ReplyMapper.TryRestoreGatewayReply(replyTo, out var restoredReply))
|
||||
replyTo = restoredReply;
|
||||
|
||||
DeliverRemoteMessage(message.Subject, replyTo, message.Payload);
|
||||
DeliverRemoteMessage(message.Account, message.Subject, replyTo, message.Payload);
|
||||
}
|
||||
|
||||
private void ProcessLeafMessage(LeafMessage message)
|
||||
@@ -892,12 +892,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
if (LeafLoopDetector.TryUnmark(subject, out var unmarked))
|
||||
subject = unmarked;
|
||||
|
||||
DeliverRemoteMessage(subject, message.ReplyTo, message.Payload);
|
||||
DeliverRemoteMessage(message.Account, subject, message.ReplyTo, message.Payload);
|
||||
}
|
||||
|
||||
private void DeliverRemoteMessage(string subject, string? replyTo, ReadOnlyMemory<byte> payload)
|
||||
private void DeliverRemoteMessage(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
var result = _globalAccount.SubList.Match(subject);
|
||||
var targetAccount = GetOrCreateAccount(account);
|
||||
var result = targetAccount.SubList.Match(subject);
|
||||
|
||||
foreach (var sub in result.PlainSubs)
|
||||
DeliverMessage(sub, subject, replyTo, default, payload);
|
||||
@@ -948,17 +949,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
var senderAccount = sender.Account ?? _globalAccount;
|
||||
if (_routeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
|
||||
_routeManager.ForwardRoutedMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult();
|
||||
_routeManager.ForwardRoutedMessageAsync(senderAccount.Name, subject, replyTo, payload, default).GetAwaiter().GetResult();
|
||||
if (_gatewayManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
|
||||
{
|
||||
var mappedReplyTo = ReplyMapper.ToGatewayReply(replyTo, ServerId);
|
||||
_gatewayManager.ForwardMessageAsync(subject, mappedReplyTo, payload, default).GetAwaiter().GetResult();
|
||||
_gatewayManager.ForwardMessageAsync(senderAccount.Name, subject, mappedReplyTo, payload, default).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
if (_leafNodeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
|
||||
{
|
||||
var markedSubject = LeafLoopDetector.Mark(subject, ServerId);
|
||||
_leafNodeManager.ForwardMessageAsync(markedSubject, replyTo, payload, default).GetAwaiter().GetResult();
|
||||
_leafNodeManager.ForwardMessageAsync(senderAccount.Name, markedSubject, replyTo, payload, default).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
|
||||
|
||||
@@ -57,13 +57,13 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
await WriteLineAsync(frame, ct);
|
||||
}
|
||||
|
||||
public async Task SendRmsgAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task SendRmsgAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
var replyToken = string.IsNullOrEmpty(replyTo) ? "-" : replyTo;
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
var control = Encoding.ASCII.GetBytes($"RMSG {subject} {replyToken} {payload.Length}\r\n");
|
||||
var control = Encoding.ASCII.GetBytes($"RMSG {account} {subject} {replyToken} {payload.Length}\r\n");
|
||||
await _stream.WriteAsync(control, ct);
|
||||
if (!payload.IsEmpty)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
@@ -116,9 +116,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("RS+ ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -126,9 +126,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
if (line.StartsWith("RS- ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
|
||||
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
|
||||
{
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, account));
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -140,14 +140,34 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
if (args.Length < 4)
|
||||
continue;
|
||||
|
||||
var subject = args[1];
|
||||
var reply = args[2] == "-" ? null : args[2];
|
||||
if (!int.TryParse(args[3], out var size) || size < 0)
|
||||
var account = "$G";
|
||||
string subject;
|
||||
string replyToken;
|
||||
string sizeToken;
|
||||
|
||||
// New format: RMSG <account> <subject> <reply> <size>
|
||||
// Legacy format: RMSG <subject> <reply> <size>
|
||||
if (args.Length >= 5 && !LooksLikeSubject(args[1]))
|
||||
{
|
||||
account = args[1];
|
||||
subject = args[2];
|
||||
replyToken = args[3];
|
||||
sizeToken = args[4];
|
||||
}
|
||||
else
|
||||
{
|
||||
subject = args[1];
|
||||
replyToken = args[2];
|
||||
sizeToken = args[3];
|
||||
}
|
||||
|
||||
var reply = replyToken == "-" ? null : replyToken;
|
||||
if (!int.TryParse(sizeToken, out var size) || size < 0)
|
||||
continue;
|
||||
|
||||
var payload = await ReadPayloadAsync(size, ct);
|
||||
if (RoutedMessageReceived != null)
|
||||
await RoutedMessageReceived(new RouteMessage(subject, reply, payload));
|
||||
await RoutedMessageReceived(new RouteMessage(subject, reply, payload, account));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,4 +286,4 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);
|
||||
public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload, string Account = "$G");
|
||||
|
||||
@@ -114,13 +114,13 @@ public sealed class RouteManager : IAsyncDisposable
|
||||
_ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
|
||||
}
|
||||
|
||||
public async Task ForwardRoutedMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
if (_routes.IsEmpty)
|
||||
return;
|
||||
|
||||
foreach (var route in _routes.Values)
|
||||
await route.SendRmsgAsync(subject, replyTo, payload, ct);
|
||||
await route.SendRmsgAsync(account, subject, replyTo, payload, ct);
|
||||
}
|
||||
|
||||
private async Task AcceptLoopAsync(CancellationToken ct)
|
||||
|
||||
Reference in New Issue
Block a user