diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index 96f84d8..b5707f8 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -58,6 +58,7 @@ public sealed class InternalEventSystem : IAsyncDisposable private ulong _sequence; private int _subscriptionId; + private readonly ConcurrentDictionary _callbacks = new(); public Account SystemAccount { get; } public InternalClient SystemClient { get; } @@ -100,20 +101,26 @@ public sealed class InternalEventSystem : IAsyncDisposable Client = SystemClient, }; - // Wrap callback in noInlineCallback pattern: enqueue to receive loop + // Store callback keyed by SID so multiple subscriptions work + _callbacks[sid] = callback; + + // Set a single routing callback on the system client that dispatches by SID SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => { - _receiveQueue.Writer.TryWrite(new InternalSystemMessage + if (_callbacks.TryGetValue(s, out var cb)) { - Sub = sub, - Client = SystemClient, - Account = SystemAccount, - Subject = subj, - Reply = reply, - Headers = hdr, - Message = msg, - Callback = callback, - }); + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = cb, + }); + } }; SystemAccount.SubList.Insert(sub);