diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 0e7a8e9..83c3f2f 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -227,6 +227,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { var subList = sender.Account?.SubList ?? _globalAccount.SubList; var result = subList.Match(subject); + var delivered = false; // Deliver to plain subscribers foreach (var sub in result.PlainSubs) @@ -235,6 +236,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable continue; DeliverMessage(sub, subject, replyTo, headers, payload); + delivered = true; } // Deliver to one member of each queue group (round-robin) @@ -253,10 +255,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) { DeliverMessage(sub, subject, replyTo, headers, payload); + delivered = true; break; } } } + + // No-responders: if nobody received the message and the publisher + // opted in, send back a 503 status HMSG on the reply subject. + if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true) + { + SendNoResponders(sender, replyTo); + } } private static void DeliverMessage(Subscription sub, string subject, string? replyTo, @@ -273,6 +283,36 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable client.SendMessage(subject, sub.Sid, replyTo, headers, payload); } + private static void SendNoResponders(NatsClient sender, string replyTo) + { + // Find the sid for a subscription matching the reply subject + var sid = string.Empty; + foreach (var sub in sender.Subscriptions.Values) + { + if (SubjectMatch.MatchLiteral(replyTo, sub.Subject)) + { + sid = sub.Sid; + break; + } + } + + // Build: HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n{headers}\r\n + var headerBlock = "NATS/1.0 503\r\n\r\n"u8; + var hdrLen = headerBlock.Length; + var controlLine = Encoding.ASCII.GetBytes($"HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n"); + + var totalLen = controlLine.Length + hdrLen + NatsProtocol.CrLf.Length; + var msg = new byte[totalLen]; + var offset = 0; + controlLine.CopyTo(msg.AsSpan(offset)); + offset += controlLine.Length; + headerBlock.CopyTo(msg.AsSpan(offset)); + offset += hdrLen; + NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset)); + + sender.QueueOutbound(msg); + } + public Account GetOrCreateAccount(string name) { return _accounts.GetOrAdd(name, n => new Account(n));