feat(batch19): implement service reply and stream import/export methods

This commit is contained in:
Joseph Doherty
2026-02-28 20:15:59 -05:00
parent f4dfbf49bd
commit 5493703280
2 changed files with 414 additions and 0 deletions

View File

@@ -2768,6 +2768,420 @@ public sealed class Account : INatsAccount
ServiceImportReply = [.. prefix, (byte)'.'];
}
/// <summary>
/// Generates a new service reply subject.
/// Mirrors Go <c>(a *Account) newServiceReply(tracking bool) []byte</c>.
/// </summary>
internal byte[] NewServiceReply(bool tracking)
{
bool createdPrefix = false;
byte[] replyPrefix;
_mu.EnterWriteLock();
try
{
if (ServiceImportReply == null)
{
CreateRespWildcard();
createdPrefix = true;
}
replyPrefix = ServiceImportReply ?? Encoding.ASCII.GetBytes("_R_.");
}
finally
{
_mu.ExitWriteLock();
}
if (createdPrefix)
_ = SubscribeServiceImportResponse(Encoding.ASCII.GetString([.. replyPrefix, (byte)'>']));
const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
Span<byte> randomPart = stackalloc byte[20];
ulong random = (ulong)Random.Shared.NextInt64();
for (var i = 0; i < randomPart.Length; i++)
{
randomPart[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)];
random /= (ulong)alphabet.Length;
}
var reply = new List<byte>(replyPrefix.Length + randomPart.Length + 2);
reply.AddRange(replyPrefix);
reply.AddRange(randomPart.ToArray());
if (tracking)
{
reply.Add((byte)'.');
reply.Add((byte)'T');
}
return [.. reply];
}
/// <summary>
/// Returns the response threshold for an exported service.
/// Mirrors Go <c>(a *Account) ServiceExportResponseThreshold(...)</c>.
/// </summary>
public (TimeSpan Threshold, Exception? Error) ServiceExportResponseThreshold(string export)
{
_mu.EnterReadLock();
try
{
var serviceExport = GetServiceExport(export);
if (serviceExport == null)
return (TimeSpan.Zero, new InvalidOperationException($"no export defined for \"{export}\""));
return (serviceExport.ResponseThreshold, null);
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Sets max response delivery time for an exported service.
/// Mirrors Go <c>(a *Account) SetServiceExportResponseThreshold(...)</c>.
/// </summary>
public Exception? SetServiceExportResponseThreshold(string export, TimeSpan maxTime)
{
_mu.EnterWriteLock();
try
{
if (IsClaimAccount())
return new InvalidOperationException("claim based accounts can not be updated directly");
var serviceExport = GetServiceExport(export);
if (serviceExport == null)
return new InvalidOperationException($"no export defined for \"{export}\"");
serviceExport.ResponseThreshold = maxTime;
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Enables/disables cross-account trace propagation on a service export.
/// Mirrors Go <c>(a *Account) SetServiceExportAllowTrace(...)</c>.
/// </summary>
public Exception? SetServiceExportAllowTrace(string export, bool allowTrace)
{
_mu.EnterWriteLock();
try
{
var serviceExport = GetServiceExport(export);
if (serviceExport == null)
return new InvalidOperationException($"no export defined for \"{export}\"");
serviceExport.AllowTrace = allowTrace;
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Creates internal response service import entry.
/// Mirrors Go <c>(a *Account) addRespServiceImport(...)</c>.
/// </summary>
internal ServiceImportEntry AddRespServiceImport(Account destination, string to, ServiceImportEntry originalServiceImport, bool tracking, Dictionary<string, string[]>? header)
{
var newReply = Encoding.ASCII.GetString(originalServiceImport.Account?.NewServiceReply(tracking) ?? NewServiceReply(tracking));
ServiceImportEntry responseImport;
_mu.EnterWriteLock();
try
{
responseImport = new ServiceImportEntry
{
Account = destination,
ServiceExport = originalServiceImport.ServiceExport,
From = newReply,
To = to,
ResponseType = originalServiceImport.ResponseType,
IsResponse = true,
Share = originalServiceImport.Share,
Timestamp = UtcNowUnixNanos(),
Tracking = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton,
TrackingHeader = header,
Latency = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton
? originalServiceImport.Latency
: null,
};
Exports.Responses ??= new Dictionary<string, ServiceImportEntry>(StringComparer.Ordinal);
Exports.Responses[newReply] = responseImport;
}
finally
{
_mu.ExitWriteLock();
}
destination.AddReverseRespMapEntry(this, to, newReply);
return responseImport;
}
/// <summary>
/// Adds stream import with optional claim context.
/// Mirrors Go <c>(a *Account) AddStreamImportWithClaim(...)</c>.
/// </summary>
public Exception? AddStreamImportWithClaim(Account account, string from, string prefix, object? importClaim) =>
AddStreamImportWithClaimInternal(account, from, prefix, false, importClaim);
/// <summary>
/// Internal stream import add helper.
/// Mirrors Go <c>(a *Account) addStreamImportWithClaim(...)</c>.
/// </summary>
internal Exception? AddStreamImportWithClaimInternal(Account account, string from, string prefix, bool allowTrace, object? importClaim)
{
if (account == null)
return ServerErrors.ErrMissingAccount;
if (!account.CheckStreamImportAuthorized(this, from, importClaim))
return ServerErrors.ErrStreamImportAuthorization;
if (!string.IsNullOrEmpty(prefix))
{
if (SubscriptionIndex.SubjectHasWildcard(prefix))
return ServerErrors.ErrStreamImportBadPrefix;
if (!prefix.EndsWith(".", StringComparison.Ordinal))
prefix += '.';
}
return AddMappedStreamImportWithClaimInternal(account, from, prefix + from, allowTrace, importClaim);
}
/// <summary>
/// Convenience helper for mapped stream imports without claim.
/// Mirrors Go <c>(a *Account) AddMappedStreamImport(...)</c>.
/// </summary>
public Exception? AddMappedStreamImport(Account account, string from, string to) =>
AddMappedStreamImportWithClaim(account, from, to, null);
/// <summary>
/// Adds mapped stream import with optional claim.
/// Mirrors Go <c>(a *Account) AddMappedStreamImportWithClaim(...)</c>.
/// </summary>
public Exception? AddMappedStreamImportWithClaim(Account account, string from, string to, object? importClaim) =>
AddMappedStreamImportWithClaimInternal(account, from, to, false, importClaim);
/// <summary>
/// Internal mapped stream import add helper.
/// Mirrors Go <c>(a *Account) addMappedStreamImportWithClaim(...)</c>.
/// </summary>
internal Exception? AddMappedStreamImportWithClaimInternal(Account account, string from, string to, bool allowTrace, object? importClaim)
{
if (account == null)
return ServerErrors.ErrMissingAccount;
if (!account.CheckStreamImportAuthorized(this, from, importClaim))
return ServerErrors.ErrStreamImportAuthorization;
if (string.IsNullOrEmpty(to))
to = from;
var cycleErr = StreamImportFormsCycle(account, to) ?? StreamImportFormsCycle(account, from);
if (cycleErr != null)
return cycleErr;
ISubjectTransformer? transform = null;
var usePublishedSubject = false;
if (SubscriptionIndex.SubjectHasWildcard(from))
{
if (to == from)
{
usePublishedSubject = true;
}
else
{
var (created, err) = SubjectTransform.New(from, to);
if (err != null)
return new InvalidOperationException($"failed to create mapping transform for stream import subject from \"{from}\" to \"{to}\": {err.Message}");
transform = created;
}
}
_mu.EnterWriteLock();
try
{
if (IsStreamImportDuplicate(account, from))
return ServerErrors.ErrStreamImportDuplicate;
Imports.Streams ??= [];
Imports.Streams.Add(new StreamImportEntry
{
Account = account,
From = from,
To = to,
Transform = transform,
Claim = importClaim,
UsePublishedSubject = usePublishedSubject,
AllowTrace = allowTrace,
});
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Checks if stream import duplicate exists. Lock should be held.
/// Mirrors Go <c>(a *Account) isStreamImportDuplicate(...)</c>.
/// </summary>
internal bool IsStreamImportDuplicate(Account account, string from)
{
if (Imports.Streams == null)
return false;
foreach (var streamImport in Imports.Streams)
{
if (ReferenceEquals(streamImport.Account, account) && streamImport.From == from)
return true;
}
return false;
}
/// <summary>
/// Adds stream import from a specific account.
/// Mirrors Go <c>(a *Account) AddStreamImport(...)</c>.
/// </summary>
public Exception? AddStreamImport(Account account, string from, string prefix) =>
AddStreamImportWithClaimInternal(account, from, prefix, false, null);
/// <summary>
/// Adds stream export, optionally restricted to explicit accounts.
/// Mirrors Go <c>(a *Account) AddStreamExport(...)</c>.
/// </summary>
public Exception? AddStreamExport(string subject, IReadOnlyList<Account>? accounts = null) =>
AddStreamExportWithAccountPos(subject, accounts, 0);
/// <summary>
/// Adds stream export with account-position matching.
/// Mirrors Go <c>(a *Account) addStreamExportWithAccountPos(...)</c>.
/// </summary>
public Exception? AddStreamExportWithAccountPos(string subject, IReadOnlyList<Account>? accounts, uint accountPos)
{
if (!SubscriptionIndex.IsValidSubject(subject))
return ServerErrors.ErrBadSubject;
_mu.EnterWriteLock();
try
{
Exports.Streams ??= new Dictionary<string, StreamExport>(StringComparer.Ordinal);
Exports.Streams.TryGetValue(subject, out var export);
export ??= new StreamExport();
if (accounts != null || accountPos > 0)
{
var authErr = SetExportAuth(export, subject, accounts, accountPos);
if (authErr != null)
return authErr;
}
Exports.Streams[subject] = export;
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Checks stream import authorization with account lock.
/// Mirrors Go <c>(a *Account) checkStreamImportAuthorized(...)</c>.
/// </summary>
internal bool CheckStreamImportAuthorized(Account account, string subject, object? importClaim)
{
_mu.EnterReadLock();
try { return CheckStreamImportAuthorizedNoLock(account, subject, importClaim); }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Checks stream import authorization assuming lock is already held.
/// Mirrors Go <c>(a *Account) checkStreamImportAuthorizedNoLock(...)</c>.
/// </summary>
internal bool CheckStreamImportAuthorizedNoLock(Account account, string subject, object? importClaim)
{
if (Exports.Streams == null || !SubscriptionIndex.IsValidSubject(subject))
return false;
return CheckStreamExportApproved(account, subject, importClaim);
}
/// <summary>
/// Gets wildcard-matching service export for subject.
/// Lock should be held.
/// Mirrors Go <c>(a *Account) getWildcardServiceExport(from string)</c>.
/// </summary>
internal ServiceExportEntry? GetWildcardServiceExport(string from)
{
if (Exports.Services == null)
return null;
var tokens = SubjectTransform.TokenizeSubject(from);
foreach (var (subject, serviceExport) in Exports.Services)
{
if (SubjectTransform.IsSubsetMatch(tokens, subject))
return serviceExport;
}
return null;
}
/// <summary>
/// Handles stream import activation expiration.
/// Mirrors Go <c>(a *Account) streamActivationExpired(...)</c>.
/// </summary>
internal void StreamActivationExpired(Account exportAccount, string subject)
{
_mu.EnterWriteLock();
try
{
if (IsExpired() || Imports.Streams == null)
return;
foreach (var streamImport in Imports.Streams)
{
if (ReferenceEquals(streamImport.Account, exportAccount) && streamImport.From == subject)
{
streamImport.Invalid = true;
return;
}
}
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Handles service import activation expiration.
/// Mirrors Go <c>(a *Account) serviceActivationExpired(...)</c>.
/// </summary>
internal void ServiceActivationExpired(Account destinationAccount, string subject)
{
_mu.EnterWriteLock();
try
{
if (IsExpired() || Imports.Services == null)
return;
var serviceImport = GetServiceImportForAccountLocked(destinationAccount.Name, subject);
if (serviceImport != null)
serviceImport.Invalid = true;
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Export checks
// -------------------------------------------------------------------------

Binary file not shown.