feat(batch19): implement service import maps and response subscription methods
This commit is contained in:
@@ -2231,6 +2231,543 @@ public sealed class Account : INatsAccount
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Number of configured service-import subject keys.
|
||||
/// Mirrors Go <c>(a *Account) NumServiceImports() int</c>.
|
||||
/// </summary>
|
||||
public int NumServiceImports()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try { return Imports.Services?.Count ?? 0; }
|
||||
finally { _mu.ExitReadLock(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a response service import and performs reverse-map cleanup.
|
||||
/// Mirrors Go <c>(a *Account) removeRespServiceImport(...)</c>.
|
||||
/// </summary>
|
||||
internal void RemoveRespServiceImport(ServiceImportEntry? serviceImport, RsiReason reason)
|
||||
{
|
||||
if (serviceImport == null)
|
||||
return;
|
||||
|
||||
Account? destination;
|
||||
string from;
|
||||
string to;
|
||||
bool tracking;
|
||||
bool delivered;
|
||||
ClientConnection? requestor;
|
||||
byte[]? sid;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Exports.Responses != null)
|
||||
Exports.Responses.Remove(serviceImport.From);
|
||||
|
||||
destination = serviceImport.Account;
|
||||
from = serviceImport.From;
|
||||
to = serviceImport.To;
|
||||
tracking = serviceImport.Tracking;
|
||||
delivered = serviceImport.DidDeliver;
|
||||
requestor = serviceImport.RequestingClient;
|
||||
sid = serviceImport.SubscriptionId;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (sid is { Length: > 0 } && InternalClient != null)
|
||||
InternalClient.ProcessUnsub(sid);
|
||||
|
||||
if (tracking && requestor != null && !delivered)
|
||||
SendBackendErrorTrackingLatency(serviceImport, reason);
|
||||
|
||||
destination?.CheckForReverseEntry(to, serviceImport, false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a service import for a specific destination account and subject key.
|
||||
/// Lock must be held by caller.
|
||||
/// Mirrors Go <c>(a *Account) getServiceImportForAccountLocked(...)</c>.
|
||||
/// </summary>
|
||||
internal ServiceImportEntry? GetServiceImportForAccountLocked(string destinationAccountName, string subject)
|
||||
{
|
||||
if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports))
|
||||
return null;
|
||||
|
||||
if (serviceImports.Count == 1 && serviceImports[0].Account?.Name == destinationAccountName)
|
||||
return serviceImports[0];
|
||||
|
||||
foreach (var serviceImport in serviceImports)
|
||||
{
|
||||
if (serviceImport.Account?.Name == destinationAccountName)
|
||||
return serviceImport;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a service import mapping by destination account name and subject key.
|
||||
/// Mirrors Go <c>(a *Account) removeServiceImport(dstAccName, subject string)</c>.
|
||||
/// </summary>
|
||||
internal void RemoveServiceImport(string destinationAccountName, string subject)
|
||||
{
|
||||
ServiceImportEntry? removed = null;
|
||||
byte[]? sid = null;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports))
|
||||
return;
|
||||
|
||||
if (serviceImports.Count == 1)
|
||||
{
|
||||
if (serviceImports[0].Account?.Name == destinationAccountName)
|
||||
{
|
||||
removed = serviceImports[0];
|
||||
Imports.Services.Remove(subject);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (var i = 0; i < serviceImports.Count; i++)
|
||||
{
|
||||
if (serviceImports[i].Account?.Name == destinationAccountName)
|
||||
{
|
||||
removed = serviceImports[i];
|
||||
serviceImports.RemoveAt(i);
|
||||
Imports.Services[subject] = serviceImports;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (removed?.SubscriptionId is { Length: > 0 })
|
||||
sid = removed.SubscriptionId;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (sid != null && InternalClient != null)
|
||||
InternalClient.ProcessUnsub(sid);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds an entry to the reverse-response map for response cleanup.
|
||||
/// Mirrors Go <c>(a *Account) addReverseRespMapEntry(...)</c>.
|
||||
/// </summary>
|
||||
internal void AddReverseRespMapEntry(Account account, string reply, string from)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
Imports.ReverseResponseMap ??= new Dictionary<string, List<ServiceRespEntry>>(StringComparer.Ordinal);
|
||||
if (!Imports.ReverseResponseMap.TryGetValue(reply, out var entries))
|
||||
{
|
||||
entries = [];
|
||||
Imports.ReverseResponseMap[reply] = entries;
|
||||
}
|
||||
|
||||
entries.Add(new ServiceRespEntry
|
||||
{
|
||||
Account = account,
|
||||
MappedSubject = from,
|
||||
});
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks reverse-response entries for wildcard replies.
|
||||
/// Mirrors Go <c>(a *Account) checkForReverseEntries(...)</c>.
|
||||
/// </summary>
|
||||
internal void CheckForReverseEntries(string reply, bool checkInterest, bool recursed)
|
||||
{
|
||||
if (!SubscriptionIndex.SubjectHasWildcard(reply))
|
||||
{
|
||||
CheckForReverseEntry(reply, null, checkInterest, recursed);
|
||||
return;
|
||||
}
|
||||
|
||||
List<string> replies;
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0)
|
||||
return;
|
||||
replies = [.. Imports.ReverseResponseMap.Keys];
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
|
||||
var replyTokens = SubjectTransform.TokenizeSubject(reply);
|
||||
foreach (var candidate in replies)
|
||||
{
|
||||
if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(candidate), reply))
|
||||
CheckForReverseEntry(candidate, null, checkInterest, recursed);
|
||||
else if (SubjectTransform.IsSubsetMatch(replyTokens, candidate))
|
||||
CheckForReverseEntry(candidate, null, checkInterest, recursed);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks and optionally removes reverse-response entries.
|
||||
/// Mirrors Go <c>(a *Account) checkForReverseEntry(...)</c>.
|
||||
/// </summary>
|
||||
internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest) =>
|
||||
CheckForReverseEntry(reply, serviceImport, checkInterest, false);
|
||||
|
||||
/// <summary>
|
||||
/// Internal reverse-entry checker with recursion protection.
|
||||
/// Mirrors Go <c>(a *Account) _checkForReverseEntry(...)</c>.
|
||||
/// </summary>
|
||||
internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest, bool recursed)
|
||||
{
|
||||
List<ServiceRespEntry>? responseEntries;
|
||||
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0)
|
||||
return;
|
||||
|
||||
if (SubscriptionIndex.SubjectHasWildcard(reply))
|
||||
{
|
||||
if (recursed)
|
||||
return;
|
||||
}
|
||||
else if (!Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (checkInterest && Sublist != null && Sublist.HasInterest(reply))
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
|
||||
if (SubscriptionIndex.SubjectHasWildcard(reply))
|
||||
{
|
||||
CheckForReverseEntries(reply, checkInterest, true);
|
||||
return;
|
||||
}
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Imports.ReverseResponseMap == null || !Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null)
|
||||
return;
|
||||
|
||||
if (serviceImport == null)
|
||||
{
|
||||
Imports.ReverseResponseMap.Remove(reply);
|
||||
}
|
||||
else
|
||||
{
|
||||
responseEntries.RemoveAll(entry => entry.MappedSubject == serviceImport.From);
|
||||
|
||||
if (responseEntries.Count == 0)
|
||||
Imports.ReverseResponseMap.Remove(reply);
|
||||
else
|
||||
Imports.ReverseResponseMap[reply] = responseEntries;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when a service import is overshadowed by an existing subject key.
|
||||
/// Mirrors Go <c>(a *Account) serviceImportShadowed(from string) bool</c>.
|
||||
/// </summary>
|
||||
internal bool ServiceImportShadowed(string from)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (Imports.Services == null)
|
||||
return false;
|
||||
if (Imports.Services.ContainsKey(from))
|
||||
return true;
|
||||
|
||||
foreach (var subject in Imports.Services.Keys)
|
||||
{
|
||||
if (SubscriptionIndex.SubjectIsSubsetMatch(from, subject))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when a service import already exists for destination account + source subject.
|
||||
/// Mirrors Go <c>(a *Account) serviceImportExists(dstAccName, from string) bool</c>.
|
||||
/// </summary>
|
||||
internal bool ServiceImportExists(string destinationAccountName, string from)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return GetServiceImportForAccountLocked(destinationAccountName, from) != null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates (or returns existing) internal account client.
|
||||
/// Lock must be held.
|
||||
/// Mirrors Go <c>(a *Account) internalClient() *client</c>.
|
||||
/// </summary>
|
||||
internal ClientConnection? InternalAccountClient()
|
||||
{
|
||||
if (InternalClient == null && Server is NatsServer server)
|
||||
{
|
||||
InternalClient = server.CreateInternalAccountClient();
|
||||
InternalClient.Account = this;
|
||||
}
|
||||
|
||||
return InternalClient;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates internal account-scoped subscription.
|
||||
/// Mirrors Go <c>(a *Account) subscribeInternal(...)</c>.
|
||||
/// </summary>
|
||||
internal (Subscription? Sub, Exception? Error) SubscribeInternal(string subject) =>
|
||||
SubscribeInternalEx(subject, false);
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribes from an internal account subscription.
|
||||
/// Mirrors Go <c>(a *Account) unsubscribeInternal(sub *subscription)</c>.
|
||||
/// </summary>
|
||||
internal void UnsubscribeInternal(Subscription? sub)
|
||||
{
|
||||
if (sub?.Sid == null)
|
||||
return;
|
||||
|
||||
_mu.EnterReadLock();
|
||||
var internalClient = InternalClient;
|
||||
_mu.ExitReadLock();
|
||||
internalClient?.ProcessUnsub(sub.Sid);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates internal subscription for service-import responses.
|
||||
/// Mirrors Go <c>(a *Account) subscribeServiceImportResponse(subject string)</c>.
|
||||
/// </summary>
|
||||
internal (Subscription? Sub, Exception? Error) SubscribeServiceImportResponse(string subject) =>
|
||||
SubscribeInternalEx(subject, true);
|
||||
|
||||
/// <summary>
|
||||
/// Extended internal subscription helper.
|
||||
/// Mirrors Go <c>(a *Account) subscribeInternalEx(...)</c>.
|
||||
/// </summary>
|
||||
internal (Subscription? Sub, Exception? Error) SubscribeInternalEx(string subject, bool responseImport)
|
||||
{
|
||||
ClientConnection? client;
|
||||
string sidText;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_isid++;
|
||||
client = InternalAccountClient();
|
||||
sidText = _isid.ToString();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (client == null)
|
||||
return (null, new InvalidOperationException("no internal account client"));
|
||||
|
||||
return client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), false, false, responseImport);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds an internal subscription that matches a service import's <c>from</c> subject.
|
||||
/// Mirrors Go <c>(a *Account) addServiceImportSub(si *serviceImport) error</c>.
|
||||
/// </summary>
|
||||
internal Exception? AddServiceImportSub(ServiceImportEntry serviceImport)
|
||||
{
|
||||
if (serviceImport == null)
|
||||
return ServerErrors.ErrMissingService;
|
||||
|
||||
ClientConnection? client;
|
||||
string sidText;
|
||||
string subject;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
client = InternalAccountClient();
|
||||
if (client == null)
|
||||
return null;
|
||||
if (serviceImport.SubscriptionId is { Length: > 0 })
|
||||
return new InvalidOperationException("duplicate call to create subscription for service import");
|
||||
|
||||
_isid++;
|
||||
sidText = _isid.ToString();
|
||||
serviceImport.SubscriptionId = Encoding.ASCII.GetBytes(sidText);
|
||||
subject = serviceImport.From;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
var (_, err) = client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), true, true, false);
|
||||
return err;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes all subscriptions associated with service imports.
|
||||
/// Mirrors Go <c>(a *Account) removeAllServiceImportSubs()</c>.
|
||||
/// </summary>
|
||||
internal void RemoveAllServiceImportSubs()
|
||||
{
|
||||
List<byte[]> subscriptionIds = [];
|
||||
ClientConnection? internalClient;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Imports.Services != null)
|
||||
{
|
||||
foreach (var imports in Imports.Services.Values)
|
||||
{
|
||||
foreach (var serviceImport in imports)
|
||||
{
|
||||
if (serviceImport.SubscriptionId is { Length: > 0 })
|
||||
{
|
||||
subscriptionIds.Add(serviceImport.SubscriptionId);
|
||||
serviceImport.SubscriptionId = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internalClient = InternalClient;
|
||||
InternalClient = null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (internalClient == null)
|
||||
return;
|
||||
|
||||
foreach (var sid in subscriptionIds)
|
||||
internalClient.ProcessUnsub(sid);
|
||||
|
||||
internalClient.CloseConnection(ClosedState.InternalClient);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds subscriptions for all registered service imports.
|
||||
/// Mirrors Go <c>(a *Account) addAllServiceImportSubs()</c>.
|
||||
/// </summary>
|
||||
internal void AddAllServiceImportSubs()
|
||||
{
|
||||
List<ServiceImportEntry> imports = [];
|
||||
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (Imports.Services != null)
|
||||
{
|
||||
foreach (var entries in Imports.Services.Values)
|
||||
imports.AddRange(entries);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
|
||||
foreach (var serviceImport in imports)
|
||||
_ = AddServiceImportSub(serviceImport);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes a service-import response routed to this account.
|
||||
/// Mirrors Go <c>(a *Account) processServiceImportResponse(...)</c>.
|
||||
/// </summary>
|
||||
internal void ProcessServiceImportResponse(string subject, byte[] msg)
|
||||
{
|
||||
ServiceImportEntry? serviceImport;
|
||||
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (IsExpired() || Exports.Responses == null || Exports.Responses.Count == 0)
|
||||
return;
|
||||
|
||||
if (!Exports.Responses.TryGetValue(subject, out serviceImport))
|
||||
return;
|
||||
if (serviceImport == null || serviceImport.Invalid)
|
||||
return;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
|
||||
// The client-side response processing pipeline is still under active porting.
|
||||
serviceImport.DidDeliver = msg.Length >= 0;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates response wildcard prefix for service replies.
|
||||
/// Lock must be held by caller.
|
||||
/// Mirrors Go <c>(a *Account) createRespWildcard()</c>.
|
||||
/// </summary>
|
||||
internal void CreateRespWildcard()
|
||||
{
|
||||
const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
Span<byte> prefix = stackalloc byte[14];
|
||||
prefix[0] = (byte)'_';
|
||||
prefix[1] = (byte)'R';
|
||||
prefix[2] = (byte)'_';
|
||||
prefix[3] = (byte)'.';
|
||||
|
||||
ulong random = (ulong)Random.Shared.NextInt64();
|
||||
for (var i = 4; i < prefix.Length; i++)
|
||||
{
|
||||
prefix[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)];
|
||||
random /= (ulong)alphabet.Length;
|
||||
}
|
||||
|
||||
ServiceImportReply = [.. prefix, (byte)'.'];
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Export checks
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -1682,6 +1682,18 @@ public sealed partial class ClientConnection
|
||||
}
|
||||
}
|
||||
|
||||
internal void ProcessUnsub(byte[] sid)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (Subs == null)
|
||||
return;
|
||||
|
||||
var sidText = Encoding.ASCII.GetString(sid);
|
||||
Subs.Remove(sidText);
|
||||
}
|
||||
}
|
||||
|
||||
// features 440-441: processInfo, processErr
|
||||
internal void ProcessInfo(string info)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user