feat: add delivery tracking and no-responders 503 support to ProcessMessage

When a PUB with a reply-to subject has no matching subscribers and the
sender opted into no_responders, send a 503 HMSG back on the reply
subject so request-reply callers can fail fast instead of timing out.
This commit is contained in:
Joseph Doherty
2026-02-22 23:49:39 -05:00
parent b289041761
commit c522ce99f5

View File

@@ -227,6 +227,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
var result = subList.Match(subject);
var delivered = false;
// Deliver to plain subscribers
foreach (var sub in result.PlainSubs)
@@ -235,6 +236,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
continue;
DeliverMessage(sub, subject, replyTo, headers, payload);
delivered = true;
}
// Deliver to one member of each queue group (round-robin)
@@ -253,10 +255,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
{
DeliverMessage(sub, subject, replyTo, headers, payload);
delivered = true;
break;
}
}
}
// No-responders: if nobody received the message and the publisher
// opted in, send back a 503 status HMSG on the reply subject.
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
{
SendNoResponders(sender, replyTo);
}
}
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
@@ -273,6 +283,36 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
}
private static void SendNoResponders(NatsClient sender, string replyTo)
{
// Find the sid for a subscription matching the reply subject
var sid = string.Empty;
foreach (var sub in sender.Subscriptions.Values)
{
if (SubjectMatch.MatchLiteral(replyTo, sub.Subject))
{
sid = sub.Sid;
break;
}
}
// Build: HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n{headers}\r\n
var headerBlock = "NATS/1.0 503\r\n\r\n"u8;
var hdrLen = headerBlock.Length;
var controlLine = Encoding.ASCII.GetBytes($"HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n");
var totalLen = controlLine.Length + hdrLen + NatsProtocol.CrLf.Length;
var msg = new byte[totalLen];
var offset = 0;
controlLine.CopyTo(msg.AsSpan(offset));
offset += controlLine.Length;
headerBlock.CopyTo(msg.AsSpan(offset));
offset += hdrLen;
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
sender.QueueOutbound(msg);
}
public Account GetOrCreateAccount(string name)
{
return _accounts.GetOrAdd(name, n => new Account(n));